Skip to content

Instantly share code, notes, and snippets.

@maksimr
Last active November 19, 2023 20:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maksimr/72f274a8bd69ba10ad1a6ec01f07e8f3 to your computer and use it in GitHub Desktop.
Save maksimr/72f274a8bd69ba10ad1a6ec01f07e8f3 to your computer and use it in GitHub Desktop.
streamin data from postgresql database to s3
import { CreateBucketCommand, DeleteBucketCommand, DeleteObjectCommand, ListBucketsCommand, ListObjectsCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
import pg from 'pg';
import QueryStream from 'pg-query-stream'
import { JsonStreamStringify } from 'json-stream-stringify';
/**
* This is an example of streaming data from a postgresql database to S3
* with low memory usage
*/
async function main() {
const client = new pg.Client({
user: 'username',
password: 'password',
database: 'test',
host: '127.0.0.1',
port: 5432
});
await client.connect();
const myQuery = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000]);
const myQueryStream = client.query(myQuery);
const myQueryStringStream = new JsonStreamStringify(myQueryStream);
const s3 = initS3Client();
const bucketName = await prepareTestBucket();
const now = performance.now();
const response = await s3.send(
new PutObjectCommand({
Bucket: bucketName,
Key: 'foo.json',
Body: myQueryStringStream
})
);
console.log('PutObjectCommand took', duration(now), 'ms');
console.log(response);
await client.end();
function initS3Client() {
return new S3Client({
endpoint: 'http://127.0.0.1:4566',
region: 'us-east-1',
credentials: {
accessKeyId: 'test',
secretAccessKey: 'test'
}
});
}
async function prepareTestBucket(bucketName = 'test-bucket') {
const { Buckets = [] } = await s3.send(new ListBucketsCommand({}))
if (Buckets.find(b => b.Name === bucketName)) {
await deleteBucket(bucketName);
}
await s3.send(new CreateBucketCommand({ Bucket: bucketName }));
console.log('Bucket %s created', bucketName);
return bucketName;
}
async function deleteBucket(bucketName: string) {
const now = performance.now();
console.log('Bucket %s already exists, deleting...', bucketName);
const { Contents = [] } = await s3.send(new ListObjectsCommand({ Bucket: bucketName }));
await Promise.all(Contents.map(object => {
return s3.send(new DeleteObjectCommand({ Bucket: bucketName, Key: object.Key }));
}));
await s3.send(new DeleteBucketCommand({ Bucket: bucketName }));
console.log('Bucket %s deleted (%dms)', bucketName, duration(now));
}
function duration(start: number) {
return Math.round((performance.now() - start) * 100) / 100;
}
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment