Last active
December 7, 2016 00:50
-
-
Save broady/5bdf03eea639c6bc9b3d837da45e3c06 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"path" | |
"path/filepath" | |
"strings" | |
"sync" | |
"time" | |
bigquery "google.golang.org/api/bigquery/v2" | |
"golang.org/x/net/context" | |
"golang.org/x/oauth2/google" | |
) | |
var project = flag.String("project", "", "GCP Project to use") | |
var dataset = flag.String("dataset", "", "Bigquery Dataset to use") | |
var dryRun = flag.Bool("dry_run", true, "Whether to run queries in dry-run mode") | |
var queryFiles = flag.String("query_files", "", "Files with SQL queries to run (comma-separated)") | |
var numParallelRuns = flag.Int("num_parallel_runs", 1, "Number of instances of each query to run in parallel") | |
func main() { | |
flag.Parse() | |
ctx := context.Background() | |
cli, _ := google.DefaultClient(ctx, bigquery.BigqueryScope) | |
bigqueryService, _ := bigquery.New(cli) | |
jobsService := bigquery.NewJobsService(bigqueryService) | |
queries := make(map[string][]byte) | |
jobIDs := make(map[string][]string) | |
f := false | |
for _, file := range strings.Split(*queryFiles, ",") { | |
queryName := path.Base(file) | |
queryName = queryName[0 : len(queryName)-len(filepath.Ext(queryName))] | |
sqlStatement, err := ioutil.ReadFile(file) | |
if err != nil { | |
panic(err) | |
} | |
queries[queryName] = sqlStatement | |
jobIDs[queryName] = make([]string, *numParallelRuns) | |
} | |
var wg sync.WaitGroup | |
totalStart := time.Now() | |
fmt.Println("Start: ", totalStart.Format(time.RFC3339Nano)) | |
for name, sqlStatement := range queries { | |
for i := 0; i < *numParallelRuns; i++ { | |
wg.Add(1) | |
go func(name string, i int, sqlStatement []byte) { | |
defer wg.Done() | |
q := &bigquery.QueryRequest{ | |
DefaultDataset: &bigquery.DatasetReference{ | |
DatasetId: *dataset, | |
}, | |
DryRun: *dryRun, | |
Query: string(sqlStatement), | |
TimeoutMs: 120000, | |
UseLegacySql: &f, | |
UseQueryCache: &f, | |
} | |
resp, err := jobsService.Query(*project, q).Do() | |
if err != nil { | |
panic(err) | |
} | |
jobIDs[name][i] = resp.JobReference.JobId | |
}(name, i, sqlStatement) | |
} | |
} | |
wg.Wait() | |
fmt.Println("End: ", time.Now().Format(time.RFC3339Nano)) | |
totalTime := time.Now().Sub(totalStart) | |
fmt.Printf("Total Time: %.2f\n", totalTime.Seconds()) | |
for name, _ := range queries { | |
fmt.Printf("Job IDs (%s): %s\n", name, strings.Join(jobIDs[name], ", ")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment