Skip to content

Instantly share code, notes, and snippets.

@tehmoon
Created December 19, 2017 02:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tehmoon/fcced113b7f3c501084b2ff07aef0039 to your computer and use it in GitHub Desktop.
Save tehmoon/fcced113b7f3c501084b2ff07aef0039 to your computer and use it in GitHub Desktop.
Download from s3 in a streaming fashion
package main
import (
"net/url"
"fmt"
"github.com/tehmoon/errors"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/s3"
"io"
"os"
)
// This example shows how to download from s3 using the AWS SDK in a streaming
// fashion. Meaning that there is no downloading the entire file to memory or
// writing it to disk. It does it by "tricking" the WriterAt interface to write
// the chunk to an io.Pipe(). Obviously chunks have to be ordered in order to make
// sense of what we are trying to download. That why d.Concurrency is set to 1.
// We believe that the AWS SDK will try to deliver chunks one by one sequentially.
// This is kind of confirm after reading the code: https://github.com/aws/aws-sdk-go/blob/f9f7ea2d31dc34870074633a73c622293ad37478/service/s3/s3manager/download.go#L273
// which will start 1 worker and this line https://github.com/aws/aws-sdk-go/blob/f9f7ea2d31dc34870074633a73c622293ad37478/service/s3/s3manager/download.go#L302
// will always increase the chuncks sequentially. Since we only have one worker we
// are fine.
// For security the wrapper maintains an offset and will throw an error if WriteAt
// tries to write not where the offset actually is.
var (
ErrInoutS3MissingBucket = errors.New("Bucket in url is missing")
ErrInoutS3MissingKey = errors.New("Key in url is missing")
)
func main() {
sess := session.Must(session.NewSession())
var err error
u, _ := url.Parse("s3://bucket/path/to/key")
bucket, key, err := S3FromUrl(u)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
sync := make(chan error)
streamer := NewS3DownloadStream()
downloader := s3manager.NewDownloader(sess, func(d *s3manager.Downloader) {
d.Concurrency = 1
})
dlParams := &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
}
go func() {
_, err := downloader.Download(streamer, dlParams)
streamer.CloseWithError(err)
}()
go func() {
_, err := io.Copy(os.Stdout, streamer)
sync <- err
}()
err = <- sync
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
}
func S3FromUrl(u *url.URL) (string, string, error) {
bucket := u.Host
key := u.Path
if bucket == "" {
return "", "", ErrInoutS3MissingBucket
}
if key == "" || key == "/" {
return "", "", ErrInoutS3MissingKey
}
return bucket, key, nil
}
type S3DownloadStream struct {
reader *io.PipeReader
writer *io.PipeWriter
offset int64
}
func NewS3DownloadStream() (*S3DownloadStream) {
reader, writer := io.Pipe()
return &S3DownloadStream{
reader: reader,
writer: writer,
offset: 0,
}
}
func (s S3DownloadStream) Read(p []byte) (int, error) {
return s.reader.Read(p)
}
func (s *S3DownloadStream) WriteAt(p []byte, off int64) (int, error) {
if s.offset != off {
return 0, io.EOF
}
n, err := s.writer.Write(p)
if err != nil {
return n, err
}
s.offset += int64(n)
return n, nil
}
func (s S3DownloadStream) Close() (error) {
return s.writer.Close()
}
func (s S3DownloadStream) CloseWithError(err error) (error) {
return s.writer.CloseWithError(err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment