Skip to content

Instantly share code, notes, and snippets.

@kbanman
Created April 10, 2021 03:32
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save kbanman/0aa36ffe415cdc6c44293bc3ddb6448e to your computer and use it in GitHub Desktop.
Save kbanman/0aa36ffe415cdc6c44293bc3ddb6448e to your computer and use it in GitHub Desktop.
const uploader = new S3UploadStream(s3, {
Bucket: bucketName,
ContentType: contentType,
Key: key,
})
stream.pipe(uploader)
return new Promise((resolve, reject) => {
uploader.on('uploadFinished', (result: CompleteMultipartUploadCommandOutput, fileSizeBytes: number) => {
resolve()
})
uploader.on('error', err => {
reject(err)
})
})
import {
CompleteMultipartUploadCommand,
CompleteMultipartUploadCommandOutput,
CreateMultipartUploadCommand,
CreateMultipartUploadCommandInput,
S3Client,
UploadPartCommand,
} from '@aws-sdk/client-s3'
import {Writable} from 'stream'
// S3 requires all but the last part to be at least 5MB
const BUFFER_SIZE = 5242880
export class S3UploadStream extends Writable {
private buffer = []
private bufferLength = 0
private totalLength = 0
private started = false
private promiseQueue: Promise<void>
private uploadId: string
private uploadedParts: { ETag: string, PartNumber: number }[] = []
private numParts = 0
constructor(private s3: S3Client,
private options: CreateMultipartUploadCommandInput) {
super()
}
async startUpload() {
try {
const {UploadId} = await this.s3.send(new CreateMultipartUploadCommand(this.options))
this.uploadId = UploadId
} catch (err) {
throw new Error(`Failed to create S3 MPU: ${err}`)
}
}
ensureUploadStarted() {
if (this.started) {
return
}
this.started = true
this.promiseQueue = this.startUpload()
}
_write(chunk: any, encoding: BufferEncoding, callback: Callback) {
this.ensureUploadStarted()
this.buffer.push(chunk)
this.bufferLength += chunk.length
if (this.bufferLength >= BUFFER_SIZE) {
this.emitPart()
.then(() => callback())
.catch(err => callback(err))
return
}
callback()
}
_final(callback: Callback) {
this.emitPart()
.then(() => this.completeUpload())
.then(output => this.emit('uploadFinished', output, this.totalLength))
.then(() => callback())
.catch(err => callback(err))
}
async uploadPart(part: number, chunk: Buffer) {
await this.promiseQueue
const {ETag} = await this.s3.send(new UploadPartCommand({
Bucket: this.options.Bucket,
Key: this.options.Key,
UploadId: this.uploadId,
PartNumber: part,
Body: chunk,
}))
this.uploadedParts.push({
ETag: ETag,
PartNumber: part,
})
}
private emitPart() {
if (!this.bufferLength) {
return
}
this.totalLength += this.bufferLength
const partNumber = ++this.numParts;
const partContent = Buffer.concat(this.buffer, this.bufferLength)
// Reset
this.buffer = []
this.bufferLength = 0
return this.uploadPart(partNumber, partContent)
}
private async completeUpload() {
await this.promiseQueue
let res: CompleteMultipartUploadCommandOutput
try {
res = await this.s3.send(new CompleteMultipartUploadCommand({
Bucket: this.options.Bucket,
Key: this.options.Key,
MultipartUpload: {
Parts: this.uploadedParts,
},
UploadId: this.uploadId,
}))
} catch (err) {
throw new Error(`CompleteMultipartUpload failed: ${err}`)
}
return res
}
}
type Callback = (error?: (Error | null)) => void
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment