Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Repro to highlight the bufio.Copy requirements
package main
import (
"bytes"
"fmt"
"io"
"net/http"
"os/exec"
)
func main() {
dl, err := http.Get("http://planet.openstreetmap.org/pbf/planet-150126.osm.pbf")
if err != nil {
panic(err)
}
defer dl.Body.Close()
ul, err := newGSUtilWriter("<bucket>", "planet.pbf")
if err != nil {
panic(err)
}
defer ul.Close()
// bufioprop.Copy(ul, dl.Body, 64*1024*1024)
// jnml.Copy(ul, dl.Body, 64*1024*1024)
io.Copy(ul, dl.Body)
}
// Stream writer based on gsutil to circumvent the token expiration bug.
type gsutilWriter struct {
command *exec.Cmd
stdin io.WriteCloser
failure *bytes.Buffer
}
// NewGSUtilWriter creates a stream writer, uploading a single file to GCS.
func newGSUtilWriter(bucket string, path string) (io.WriteCloser, error) {
// Create a gsutil instance to stream the writer into
cmd := exec.Command("gsutil", "cp", "-", fmt.Sprintf("gs://%s/%s", bucket, path))
// Fetch the command streams and start gsutil
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
failure := new(bytes.Buffer)
go io.Copy(failure, stderr)
if err := cmd.Start(); err != nil {
return nil, err
}
// Assemble the writer, escape any errors
return &gsutilWriter{
command: cmd,
stdin: stdin,
failure: failure,
}, nil
}
// Write simply delegates a data chunk to the gsutil stdin.
func (w *gsutilWriter) Write(p []byte) (int, error) {
return w.stdin.Write(p)
}
func (w gsutilWriter) Close() error {
// Make sure whatever happens, gsutil dies
defer w.command.Process.Kill()
// Close the gsutil input stream and flush to GCS
if err := w.stdin.Close(); err != nil {
return err
}
// Wait until the command terminates
if err := w.command.Wait(); err != nil {
return fmt.Errorf("%v: %s", err, w.failure.String())
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment