Skip to content

Instantly share code, notes, and snippets.

@aokomorowski
Last active August 21, 2023 13:36
Show Gist options
  • Save aokomorowski/0d8387ef40504bdd0d6b900b9a55f978 to your computer and use it in GitHub Desktop.
Save aokomorowski/0d8387ef40504bdd0d6b900b9a55f978 to your computer and use it in GitHub Desktop.
Large files multipart upload to S3 with goroutines using AWS SDK for Go v2
// README before going down
//
// You don't have to use that
// AWS SDK for Go v2 has a feature called s3 manager that will handle multipart upload for you (I didn't know that)
// It doesn't vary much from this implementation, but it's way more usable
//
// look up its definition and example from github.com/aws/aws-sdk-go-v2/feature/s3/manager
package main
import (
"bytes"
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
log "github.com/sirupsen/logrus"
"os"
"sort"
"sync"
)
type App struct {
s3Service s3.Client
bucketName string
}
type partUploadResult struct {
completedPart *s3types.CompletedPart
err error
}
/*
Inspired by this article
https://mehranjnf.medium.com/s3-multipart-upload-with-goroutines-92a7aebe831b
*/
func main() {
//Initialize AWS Config
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
s3Service := s3.NewFromConfig(cfg)
app := App{
s3Service: *s3Service,
bucketName: "bucket-name",
}
handleUploadFile(&app, "example.tar.gz")
}
func handleUploadFile(app *App, filename string) {
dumpFile, err := os.Open(filename)
if err != nil {
log.Errorln(err.Error())
return
}
err = app.uploadFileToBucket(dumpFile)
if err != nil {
log.Errorf("Failed uploading file '%s' to S3, err: %v", dumpFile.Name(), err.Error())
}
return
}
func (a *App) uploadFileToBucket(file *os.File) error {
log.Infof("Uploading file %s", file.Name())
defaultChuckSize := 10_000_000 // about 90 MB
wg := sync.WaitGroup{}
ch := make(chan partUploadResult)
stat, _ := file.Stat()
fileSize := stat.Size()
// Initialize multipart upload
createdResp, err := a.s3Service.CreateMultipartUpload(context.TODO(), &s3.CreateMultipartUploadInput{
Bucket: aws.String(a.bucketName),
Key: aws.String(file.Name()),
})
if err != nil {
log.Errorln("Creating multipart upload failed")
return err
}
var chunkSizeToUpload int
var remainingBytes = int(fileSize)
var partNum = 1
var completedParts []s3types.CompletedPart
// read a file from start by chunks
for start := 0; remainingBytes > 0; start += defaultChuckSize {
wg.Add(1)
chunkSizeToUpload = defaultChuckSize
if remainingBytes < defaultChuckSize {
chunkSizeToUpload = remainingBytes
}
buffer := make([]byte, chunkSizeToUpload)
_, err := file.ReadAt(buffer, int64(start))
if err != nil {
log.Errorln(err.Error())
}
remainingBytes -= chunkSizeToUpload
go a.uploadToS3(createdResp, buffer, partNum, &wg, &ch)
partNum++
}
go func() {
wg.Wait()
close(ch)
}()
for result := range ch {
if result.err != nil {
_, err = a.s3Service.AbortMultipartUpload(context.TODO(), &s3.AbortMultipartUploadInput{
Bucket: aws.String(a.bucketName),
Key: aws.String(file.Name()),
UploadId: createdResp.UploadId,
})
if err != nil {
return err
}
}
completedParts = append(completedParts, *result.completedPart)
}
sort.Slice(completedParts, func(i, j int) bool {
return completedParts[i].PartNumber < completedParts[j].PartNumber
})
_, err = a.s3Service.CompleteMultipartUpload(context.TODO(), &s3.CompleteMultipartUploadInput{
Bucket: createdResp.Bucket,
Key: createdResp.Key,
UploadId: createdResp.UploadId,
MultipartUpload: &s3types.CompletedMultipartUpload{
Parts: completedParts,
},
})
if err != nil {
return err
}
log.Infof("Uploaded file %s\n", file.Name())
return nil
}
func (a *App) uploadToS3(resp *s3.CreateMultipartUploadOutput, b []byte, num int, wg *sync.WaitGroup, ch *chan partUploadResult) {
defer wg.Done()
try := 0
for try <= 2 {
uploadRes, err := a.s3Service.UploadPart(context.TODO(), &s3.UploadPartInput{
Body: bytes.NewReader(b),
Bucket: resp.Bucket,
Key: resp.Key,
PartNumber: int32(num),
UploadId: resp.UploadId,
ContentLength: int64(len(b)),
})
if err != nil {
log.Errorln(err.Error())
if try > 2 {
*ch <- partUploadResult{nil, err}
return
}
try++
continue
}
*ch <- partUploadResult{
&s3types.CompletedPart{
ETag: uploadRes.ETag,
PartNumber: int32(num),
}, nil}
return
}
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment