Skip to content

Instantly share code, notes, and snippets.

@broady
Last active December 7, 2016 00:50
Show Gist options
  • Save broady/5bdf03eea639c6bc9b3d837da45e3c06 to your computer and use it in GitHub Desktop.
Save broady/5bdf03eea639c6bc9b3d837da45e3c06 to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"fmt"
"io/ioutil"
"path"
"path/filepath"
"strings"
"sync"
"time"
bigquery "google.golang.org/api/bigquery/v2"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
)
var project = flag.String("project", "", "GCP Project to use")
var dataset = flag.String("dataset", "", "Bigquery Dataset to use")
var dryRun = flag.Bool("dry_run", true, "Whether to run queries in dry-run mode")
var queryFiles = flag.String("query_files", "", "Files with SQL queries to run (comma-separated)")
var numParallelRuns = flag.Int("num_parallel_runs", 1, "Number of instances of each query to run in parallel")
func main() {
flag.Parse()
ctx := context.Background()
cli, _ := google.DefaultClient(ctx, bigquery.BigqueryScope)
bigqueryService, _ := bigquery.New(cli)
jobsService := bigquery.NewJobsService(bigqueryService)
queries := make(map[string][]byte)
jobIDs := make(map[string][]string)
f := false
for _, file := range strings.Split(*queryFiles, ",") {
queryName := path.Base(file)
queryName = queryName[0 : len(queryName)-len(filepath.Ext(queryName))]
sqlStatement, err := ioutil.ReadFile(file)
if err != nil {
panic(err)
}
queries[queryName] = sqlStatement
jobIDs[queryName] = make([]string, *numParallelRuns)
}
var wg sync.WaitGroup
totalStart := time.Now()
fmt.Println("Start: ", totalStart.Format(time.RFC3339Nano))
for name, sqlStatement := range queries {
for i := 0; i < *numParallelRuns; i++ {
wg.Add(1)
go func(name string, i int, sqlStatement []byte) {
defer wg.Done()
q := &bigquery.QueryRequest{
DefaultDataset: &bigquery.DatasetReference{
DatasetId: *dataset,
},
DryRun: *dryRun,
Query: string(sqlStatement),
TimeoutMs: 120000,
UseLegacySql: &f,
UseQueryCache: &f,
}
resp, err := jobsService.Query(*project, q).Do()
if err != nil {
panic(err)
}
jobIDs[name][i] = resp.JobReference.JobId
}(name, i, sqlStatement)
}
}
wg.Wait()
fmt.Println("End: ", time.Now().Format(time.RFC3339Nano))
totalTime := time.Now().Sub(totalStart)
fmt.Printf("Total Time: %.2f\n", totalTime.Seconds())
for name, _ := range queries {
fmt.Printf("Job IDs (%s): %s\n", name, strings.Join(jobIDs[name], ", "))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment