Last active
August 21, 2023 13:36
-
-
Save aokomorowski/0d8387ef40504bdd0d6b900b9a55f978 to your computer and use it in GitHub Desktop.
Large files multipart upload to S3 with goroutines using AWS SDK for Go v2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// README before going down | |
// | |
// You don't have to use that | |
// AWS SDK for Go v2 has a feature called s3 manager that will handle multipart upload for you (I didn't know that) | |
// It doesn't vary much from this implementation, but it's way more usable | |
// | |
// look up its definition and example from github.com/aws/aws-sdk-go-v2/feature/s3/manager | |
package main | |
import ( | |
"bytes" | |
"context" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/config" | |
"github.com/aws/aws-sdk-go-v2/service/s3" | |
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" | |
log "github.com/sirupsen/logrus" | |
"os" | |
"sort" | |
"sync" | |
) | |
type App struct { | |
s3Service s3.Client | |
bucketName string | |
} | |
type partUploadResult struct { | |
completedPart *s3types.CompletedPart | |
err error | |
} | |
/* | |
Inspired by this article | |
https://mehranjnf.medium.com/s3-multipart-upload-with-goroutines-92a7aebe831b | |
*/ | |
func main() { | |
//Initialize AWS Config | |
cfg, err := config.LoadDefaultConfig(context.TODO()) | |
if err != nil { | |
log.Fatalf("unable to load SDK config, %v", err) | |
} | |
s3Service := s3.NewFromConfig(cfg) | |
app := App{ | |
s3Service: *s3Service, | |
bucketName: "bucket-name", | |
} | |
handleUploadFile(&app, "example.tar.gz") | |
} | |
func handleUploadFile(app *App, filename string) { | |
dumpFile, err := os.Open(filename) | |
if err != nil { | |
log.Errorln(err.Error()) | |
return | |
} | |
err = app.uploadFileToBucket(dumpFile) | |
if err != nil { | |
log.Errorf("Failed uploading file '%s' to S3, err: %v", dumpFile.Name(), err.Error()) | |
} | |
return | |
} | |
func (a *App) uploadFileToBucket(file *os.File) error { | |
log.Infof("Uploading file %s", file.Name()) | |
defaultChuckSize := 10_000_000 // about 90 MB | |
wg := sync.WaitGroup{} | |
ch := make(chan partUploadResult) | |
stat, _ := file.Stat() | |
fileSize := stat.Size() | |
// Initialize multipart upload | |
createdResp, err := a.s3Service.CreateMultipartUpload(context.TODO(), &s3.CreateMultipartUploadInput{ | |
Bucket: aws.String(a.bucketName), | |
Key: aws.String(file.Name()), | |
}) | |
if err != nil { | |
log.Errorln("Creating multipart upload failed") | |
return err | |
} | |
var chunkSizeToUpload int | |
var remainingBytes = int(fileSize) | |
var partNum = 1 | |
var completedParts []s3types.CompletedPart | |
// read a file from start by chunks | |
for start := 0; remainingBytes > 0; start += defaultChuckSize { | |
wg.Add(1) | |
chunkSizeToUpload = defaultChuckSize | |
if remainingBytes < defaultChuckSize { | |
chunkSizeToUpload = remainingBytes | |
} | |
buffer := make([]byte, chunkSizeToUpload) | |
_, err := file.ReadAt(buffer, int64(start)) | |
if err != nil { | |
log.Errorln(err.Error()) | |
} | |
remainingBytes -= chunkSizeToUpload | |
go a.uploadToS3(createdResp, buffer, partNum, &wg, &ch) | |
partNum++ | |
} | |
go func() { | |
wg.Wait() | |
close(ch) | |
}() | |
for result := range ch { | |
if result.err != nil { | |
_, err = a.s3Service.AbortMultipartUpload(context.TODO(), &s3.AbortMultipartUploadInput{ | |
Bucket: aws.String(a.bucketName), | |
Key: aws.String(file.Name()), | |
UploadId: createdResp.UploadId, | |
}) | |
if err != nil { | |
return err | |
} | |
} | |
completedParts = append(completedParts, *result.completedPart) | |
} | |
sort.Slice(completedParts, func(i, j int) bool { | |
return completedParts[i].PartNumber < completedParts[j].PartNumber | |
}) | |
_, err = a.s3Service.CompleteMultipartUpload(context.TODO(), &s3.CompleteMultipartUploadInput{ | |
Bucket: createdResp.Bucket, | |
Key: createdResp.Key, | |
UploadId: createdResp.UploadId, | |
MultipartUpload: &s3types.CompletedMultipartUpload{ | |
Parts: completedParts, | |
}, | |
}) | |
if err != nil { | |
return err | |
} | |
log.Infof("Uploaded file %s\n", file.Name()) | |
return nil | |
} | |
func (a *App) uploadToS3(resp *s3.CreateMultipartUploadOutput, b []byte, num int, wg *sync.WaitGroup, ch *chan partUploadResult) { | |
defer wg.Done() | |
try := 0 | |
for try <= 2 { | |
uploadRes, err := a.s3Service.UploadPart(context.TODO(), &s3.UploadPartInput{ | |
Body: bytes.NewReader(b), | |
Bucket: resp.Bucket, | |
Key: resp.Key, | |
PartNumber: int32(num), | |
UploadId: resp.UploadId, | |
ContentLength: int64(len(b)), | |
}) | |
if err != nil { | |
log.Errorln(err.Error()) | |
if try > 2 { | |
*ch <- partUploadResult{nil, err} | |
return | |
} | |
try++ | |
continue | |
} | |
*ch <- partUploadResult{ | |
&s3types.CompletedPart{ | |
ETag: uploadRes.ETag, | |
PartNumber: int32(num), | |
}, nil} | |
return | |
} | |
return | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment