Skip to content

Instantly share code, notes, and snippets.

@filmaj
Last active December 29, 2023 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save filmaj/f4a5f398b9701b34878503051efa8e34 to your computer and use it in GitHub Desktop.
Save filmaj/f4a5f398b9701b34878503051efa8e34 to your computer and use it in GitHub Desktop.
arc.codes s3 download/upload utility in node with streams
const arc = require('@architect/functions');
const { GetObjectCommand, S3Client } = require('@aws-sdk/client-s3');
const { Upload } = require('@aws-sdk/lib-storage');
const client = new S3Client({});
const CONSTANTS = require('@architect/shared/constants');
const { timer } = require('@architect/shared/utils');
const jsonlines = require('@jsonlines/core')
const stream = require('stream');
/*
storing a 'cache' file in S3: a JSON-LD file, gzipped
at rest, this file contains array tuples [string, number] that represent repo names and TTL expiry for said repo
`download` downloads this file from S3 and converts it into a massive obj with key:value pairs of repo_name:ttl_expiry
`upload` does the reverse: converts the massive object into array tuples and zips it up and ships to S3
*/
module.exports = {
download: async () => {
let st = new Date().valueOf();
const services = await arc.services();
const bucket = services['storage-private'].appdata;
let et = new Date().valueOf();
console.log(`DL [${timer(et, st)}] arc services loaded`);
const command = new GetObjectCommand({
Bucket: bucket,
Key: CONSTANTS.S3_CACHE_FILE
});
st = new Date().valueOf();
const response = await client.send(command);
return new Promise((res, rej) => {
const repos = {};
let n = 0;
let h = 0;
const now = Math.floor(new Date().valueOf() / 1000);
response.Body
.pipe(jsonlines.parse({ gzip: true }))
.on('data', (data) => {
// data is a tuple of [repo_name, ttl_expiry]
n += 1;
// filter out expired items from s3 cache
if (now < data[1]) {
h += 1;
repos[data[0]] = data[1];
}
})
.on('error', (err) => {
console.error('DL ERROR pulling s3 cache file!', err);
rej(err);
})
.on('close', () => {
et = new Date().valueOf();
console.log(`DL [${timer(et, st)}] repo cache read: ${h} valid (${Math.floor(h / n * 100 * 100) / 100}%)`);
res(repos);
});
});
},
upload: async (repos) => {
let st = new Date().valueOf();
const services = await arc.services();
const Bucket = services['storage-private'].appdata;
let et = new Date().valueOf();
console.log(`UL [${timer(et, st)}] arc services loaded`);
const reponames = Object.keys(repos);
const arr = [];
for (let i = 0; i < reponames.length; i++) {
const repo = reponames[i];
arr.push([repo, repos[repo]]);
}
st = new Date().valueOf();
const str = stream.Readable.from(arr)
.pipe(jsonlines.stringify({ gzip: true }))
.on('error', (err) => {
console.error('ERROR piping s3 cache file to s3', err);
throw err;
})
.on('close', () => {
et = new Date().valueOf();
console.log(`UL [${timer(et, st)}] s3 cache stream closed`);
});
const parallelUploads3 = new Upload({
client,
queueSize: 1, // optional concurrency configuration
leavePartsOnError: false, // optional manually handle dropped parts
params: {
Bucket,
Key: CONSTANTS.S3_CACHE_FILE,
Body: str
}
});
const res = await parallelUploads3.done();
et = new Date().valueOf();
console.log(`UL [${timer(et, st)}] S3 HTTP ${res.$metadata.httpStatusCode}`);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment