Skip to content

Instantly share code, notes, and snippets.

@rosmo
Last active December 30, 2021 22:32
Show Gist options
  • Save rosmo/a6469ea80cf1375142cca0a7d6e89442 to your computer and use it in GitHub Desktop.
Save rosmo/a6469ea80cf1375142cca0a7d6e89442 to your computer and use it in GitHub Desktop.
gcs2bq wrapper in golang
# To build and deploy:
#
# docker build -t gcr.io/YOUR-PROJECT/gsc2bq-cloudrun:latest .
# docker push gcr.io/YOUR-PROJECT/gsc2bq-cloudrun:latest
#
# gcloud run deploy gcs2bq --image=gcr.io/YOUR-PROJECT/gsc2bq-cloudrun:latest \
# --service-account=your-sa@YOUR-PROJECT.iam.gserviceaccount.com \
# --timeout=60m --set-env-vars=GCS2BQ_PROJECT=YOUR-PROJECT,... \
# --region=europe-west4
#
# Test invocation with:
# curl -NH "Authorization: Bearer $(gcloud auth print-identity-token)" https://your-cloud-run-function.run.app
FROM golang:1.17-alpine3.15 as build
RUN go get github.com/GoogleCloudPlatform/professional-services/tools/gcs2bq
COPY main.go /
RUN go build -o /main /main.go
FROM google/cloud-sdk:alpine
COPY --from=build /go/bin/gcs2bq /bin/gcs2bq
COPY --from=build /main /bin/gcs2bq-entrypoint
RUN curl -Lso /bigquery.schema https://github.com/GoogleCloudPlatform/professional-services/raw/main/tools/gcs2bq/bigquery.schema
RUN curl -Lso /gcs2bq.avsc https://raw.githubusercontent.com/GoogleCloudPlatform/professional-services/main/tools/gcs2bq/gcs2bq.avsc
EXPOSE 8080
ENTRYPOINT /bin/gcs2bq-entrypoint
// This is the old version that doesn't display output into the HTTP request
package main
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path"
"syscall"
)
func main() {
log.Print("Starting function...")
http.HandleFunc("/", handler)
// Determine port for HTTP service.
port := os.Getenv("PORT")
if port == "" {
port = "8080"
log.Printf("Defaulting to port %s", port)
}
// Start HTTP server.
log.Printf("Listening on port %s", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatal(err)
}
}
func runCommand(canFail bool, allowedExitCodes []int, name string, args ...string) {
log.Printf("Running: %s %+q", name, args)
cmd := exec.Command(name, args...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
if stdout.String() != "" {
log.Println("stdout:", stdout.String())
}
if stderr.String() != "" {
log.Println("stderr:", stderr.String())
}
if canFail {
log.Println("Warning:", err)
return
}
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
for _, exitcode := range allowedExitCodes {
if status.ExitStatus() == exitcode {
return
}
}
log.Printf("Process exited with status code: %d", status.ExitStatus())
}
}
log.Fatalln("Error:", err)
}
}
func handler(w http.ResponseWriter, r *http.Request) {
binary := "/bin/gcs2bq"
projectId := os.Getenv("GCS2BQ_PROJECT")
bigqueryDataset := os.Getenv("GCS2BQ_DATASET")
bigqueryTable := os.Getenv("GCS2BQ_TABLE")
bucket := os.Getenv("GCS2BQ_BUCKET")
location := os.Getenv("GCS2BQ_LOCATION")
file, err := ioutil.TempFile("/tmp", "gcs2bq")
if err != nil {
log.Fatal(err)
}
// Remove the file right away since gcs2bq excepts it not to be there
os.Remove(file.Name())
filename := fmt.Sprintf("%s.avro", file.Name())
args := []string{"-logtostderr", "-file", filename}
if os.Getenv("GCS2BQ_VERSIONS") != "" {
args = append(args, "-versions")
}
runCommand(false, []int{0, 2, 3}, binary, args...)
defer os.Remove(file.Name())
mkBucketArgs := []string{"mb", "-p", projectId, "-c", "standard", "-l", location, "-b", "on", fmt.Sprintf("gs://%s", bucket)}
runCommand(true, []int{0}, "gsutil", mkBucketArgs...)
baseFilename := path.Base(filename)
copyFileArgs := []string{"cp", filename, fmt.Sprintf("gs://%s/%s", bucket, baseFilename)}
runCommand(false, []int{0}, "gsutil", copyFileArgs...)
bqMakeDatasetArgs := []string{"mk", "--project_id", projectId, "--location", location, bigqueryDataset}
runCommand(true, []int{0}, "bq", bqMakeDatasetArgs...)
bqLoadArgs := []string{"load", fmt.Sprintf("--project_id=%s", projectId), fmt.Sprintf("--location=%s", location), "--schema=bigquery.schema", "--source_format=AVRO", "--use_avro_logical_types", "--replace=true", fmt.Sprintf("%s.%s", bigqueryDataset, bigqueryTable), fmt.Sprintf("gs://%s/%s", bucket, baseFilename)}
runCommand(false, []int{0}, "bq", bqLoadArgs...)
removeFileArgs := []string{"rm", fmt.Sprintf("gs://%s/%s", bucket, baseFilename)}
runCommand(true, []int{0}, "gsutil", removeFileArgs...)
log.Println("All tasks finished.")
}
// This is the new version that streams stdout and stuff into the HTTP request,
// so better use this one
package main
import (
"bufio"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path"
"syscall"
"time"
)
func main() {
log.Print("Starting function...")
http.HandleFunc("/", handler)
// Determine port for HTTP service.
port := os.Getenv("PORT")
if port == "" {
port = "8080"
log.Printf("Defaulting to port %s", port)
}
// Start HTTP server.
log.Printf("Listening on port %s", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatal(err)
}
}
func writeProgress(w http.ResponseWriter, flusher http.Flusher, message string) {
fmt.Fprintf(w, "%s\n", message)
flusher.Flush()
}
func runCommand(w http.ResponseWriter, flusher http.Flusher, canFail bool, allowedExitCodes []int, name string, args ...string) {
log.Printf("Running: %s %+q", name, args)
writeProgress(w, flusher, fmt.Sprintf("Running command: %s", name))
cmd := exec.Command(name, args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatalln("Error getting stdout pipe:", err)
}
stdoutBuf := bufio.NewScanner(stdout)
stderr, err := cmd.StderrPipe()
if err != nil {
log.Fatalln("Error getting stderr pipe:", err)
}
stderrBuf := bufio.NewScanner(stderr)
if err := cmd.Start(); err != nil {
log.Fatalln("Error starting command:", err)
return
}
done := make(chan error)
output := make(chan string)
go func() { done <- cmd.Wait() }()
go func() {
for stdoutBuf.Scan() {
text := stdoutBuf.Text()
output <- text
}
}()
go func() {
for stderrBuf.Scan() {
text := stderrBuf.Text()
output <- text
}
}()
pollTimer := time.After(1 * time.Second)
for {
select {
case line := <-output:
writeProgress(w, flusher, line)
case <-pollTimer:
writeProgress(w, flusher, fmt.Sprintf("Still waiting to complete: %s", name))
pollTimer = time.After(1 * time.Second)
case err := <-done:
if err != nil {
if canFail {
log.Println("Warning:", err)
return
}
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
for _, exitcode := range allowedExitCodes {
if status.ExitStatus() == exitcode {
return
}
}
log.Printf("Process exited with status code: %d", status.ExitStatus())
}
}
log.Fatalln("Error:", err)
} else {
writeProgress(w, flusher, fmt.Sprintf("Command complete: %s", name))
return
}
}
}
}
func handler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
binary := "/bin/gcs2bq"
projectId := os.Getenv("GCS2BQ_PROJECT")
bigqueryDataset := os.Getenv("GCS2BQ_DATASET")
bigqueryTable := os.Getenv("GCS2BQ_TABLE")
bucket := os.Getenv("GCS2BQ_BUCKET")
location := os.Getenv("GCS2BQ_LOCATION")
file, err := ioutil.TempFile("/tmp", "gcs2bq")
if err != nil {
log.Fatal(err)
}
// Remove the file right away since gcs2bq excepts it not to be there
os.Remove(file.Name())
filename := fmt.Sprintf("%s.avro", file.Name())
args := []string{"-logtostderr", "-file", filename}
if os.Getenv("GCS2BQ_LOCATION") != "" {
args = append(args, "-versions")
}
runCommand(w, flusher, false, []int{0, 2, 3}, binary, args...)
defer os.Remove(file.Name())
mkBucketArgs := []string{"mb", "-p", projectId, "-c", "standard", "-l", location, "-b", "on", fmt.Sprintf("gs://%s", bucket)}
runCommand(w, flusher, true, []int{0}, "gsutil", mkBucketArgs...)
baseFilename := path.Base(filename)
copyFileArgs := []string{"cp", filename, fmt.Sprintf("gs://%s/%s", bucket, baseFilename)}
runCommand(w, flusher, false, []int{0}, "gsutil", copyFileArgs...)
bqMakeDatasetArgs := []string{"mk", "--project_id", projectId, "--location", location, bigqueryDataset}
runCommand(w, flusher, true, []int{0}, "bq", bqMakeDatasetArgs...)
bqLoadArgs := []string{"load", fmt.Sprintf("--project_id=%s", projectId), fmt.Sprintf("--location=%s", location), "--schema=bigquery.schema", "--source_format=AVRO", "--use_avro_logical_types", "--replace=true", fmt.Sprintf("%s.%s", bigqueryDataset, bigqueryTable), fmt.Sprintf("gs://%s/%s", bucket, baseFilename)}
runCommand(w, flusher, false, []int{0}, "bq", bqLoadArgs...)
removeFileArgs := []string{"rm", fmt.Sprintf("gs://%s/%s", bucket, baseFilename)}
runCommand(w, flusher, true, []int{0}, "gsutil", removeFileArgs...)
writeProgress(w, flusher, "All tasks finished.")
log.Println("All tasks finished.")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment