Skip to content

Instantly share code, notes, and snippets.

@weeksie
Created November 7, 2022 20:09
Show Gist options
  • Save weeksie/a93adcdba5f77b2897928188b38a3c30 to your computer and use it in GitHub Desktop.
Save weeksie/a93adcdba5f77b2897928188b38a3c30 to your computer and use it in GitHub Desktop.
A simple
import stream from 'stream';
import {
S3Client,
CreateMultipartUploadCommand,
UploadPartCommand,
CreateMultipartUploadCommandOutput,
CompleteMultipartUploadCommand,
CompletedPart,
DeleteObjectCommand,
AbortMultipartUploadCommand,
} from '@aws-sdk/client-s3';
import {
assertEnv,
} from '@melty/app';
const {
S3_ACCESS_KEY,
S3_SECRET_KEY,
} =
assertEnv(
process.env,
'S3_ACCESS_KEY',
'S3_SECRET_KEY'
);
const client = new S3Client({
region: 'us-west-2',
credentials: {
accessKeyId: S3_ACCESS_KEY,
secretAccessKey: S3_SECRET_KEY,
},
});
const AWS_PREFIX = `https://melty-uploads.s3.us-west-2.amazonaws.com/`;
const MIN_CHUNK_SIZE = 1024 * 1024 * 5; // 5mb
const Bucket = 'melty-uploads';
type S3WritableStreamProps = {
fileName: string;
};
export class S3WritableStream extends stream.Writable {
buffered: number;
encoding: BufferEncoding;
s3Parts: Promise<CompletedPart>[];
client: S3Client;
chunks: Buffer[];
started: boolean;
/**
The initial upload acknowledgement. If a file is very small we get into a race
condition where the upload might not have started yet and we're still waiting
for the upload id. This lets the `finishUpload` method wait until this is settled.
*/
ack: Promise<CreateMultipartUploadCommandOutput>;
fileName: string;
uploadId?: string;
constructor({
fileName,
}: S3WritableStreamProps) {
super({
//defaultEncoding: 'latin1',
});
this.started = false;
this.s3Parts = [];
this.chunks = [];
this.buffered = 0;
this.fileName = fileName;
this.client = new S3Client({
region: 'us-west-2',
credentials: {
accessKeyId: S3_ACCESS_KEY,
secretAccessKey: S3_SECRET_KEY,
}
});
}
onError = (e: Error) => {
this.emit('error', e);
}
get key(): string {
return `${this.fileName}`;
}
_write(chunk: string, encoding: BufferEncoding, next: () => void) {
const buffer: Buffer = Buffer.from(chunk, encoding);
this.chunks.push(buffer);
this.buffered += chunk.length;
if (!this.started) {
this.started = true;
this.emit('start');
this.startUpload().then(() => next());
return;
}
this.emit('progress', chunk.length, 0);
if (this.buffered < MIN_CHUNK_SIZE) {
next();
return;
}
this.sendPart().then(() => next());
}
end(next: () => void) {
this.finishUpload().then(() => next());
return this;
}
async startUpload() {
const start = new CreateMultipartUploadCommand({
Key: this.key,
ACL: 'public-read',
Bucket,
});
this.ack = client.send(start);
this.uploadId = (await this.ack).UploadId;
}
async finishUpload() {
// If a very small file has been uploaded we could get here before the
// ack is recieved. If that's the case we need to wait for the ack
// to resolve, otherwise there's no uploadId.
await this.ack;
if (this.chunks.length) {
this.sendPart();
}
const Parts = await Promise.all(this.s3Parts);
const finished = new CompleteMultipartUploadCommand({
Bucket,
UploadId: this.uploadId,
Key: this.key,
MultipartUpload: { Parts },
});
await this.client.send(finished);
this.emit('done', `${AWS_PREFIX}${this.key}`);
}
async sendPart() {
const part = new UploadPartCommand({
Bucket,
Key: this.key,
UploadId: this.uploadId,
PartNumber: this.s3Parts.length + 1,
Body: Buffer.concat(this.chunks),
});
this.s3Parts.push(
this.client.send(part).then(
(response) => {
if (typeof part.input.Body === 'string') {
this.emit('progress', 0, part.input.Body.length);
}
return {
ETag: response.ETag,
PartNumber: part.input.PartNumber,
}
}
)
);
this.chunks = [];
this.buffered = 0;
}
cancelUpload() {
if (!this.uploadId) {
return;
}
return client.send(new AbortMultipartUploadCommand({
Bucket,
Key: this.key,
UploadId: this.uploadId,
}));
}
};
export const remove = async (src: string) => {
const Bucket = 'melty-uploads';
const Key = src.replace(AWS_PREFIX, '').replace(/^\//, '');
await client.send(new DeleteObjectCommand({
Bucket,
Key,
}));
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment