Last active
November 19, 2023 20:33
-
-
Save maksimr/72f274a8bd69ba10ad1a6ec01f07e8f3 to your computer and use it in GitHub Desktop.
streamin data from postgresql database to s3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { CreateBucketCommand, DeleteBucketCommand, DeleteObjectCommand, ListBucketsCommand, ListObjectsCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3'; | |
import pg from 'pg'; | |
import QueryStream from 'pg-query-stream' | |
import { JsonStreamStringify } from 'json-stream-stringify'; | |
/** | |
* This is an example of streaming data from a postgresql database to S3 | |
* with low memory usage | |
*/ | |
async function main() { | |
const client = new pg.Client({ | |
user: 'username', | |
password: 'password', | |
database: 'test', | |
host: '127.0.0.1', | |
port: 5432 | |
}); | |
await client.connect(); | |
const myQuery = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000]); | |
const myQueryStream = client.query(myQuery); | |
const myQueryStringStream = new JsonStreamStringify(myQueryStream); | |
const s3 = initS3Client(); | |
const bucketName = await prepareTestBucket(); | |
const now = performance.now(); | |
const response = await s3.send( | |
new PutObjectCommand({ | |
Bucket: bucketName, | |
Key: 'foo.json', | |
Body: myQueryStringStream | |
}) | |
); | |
console.log('PutObjectCommand took', duration(now), 'ms'); | |
console.log(response); | |
await client.end(); | |
function initS3Client() { | |
return new S3Client({ | |
endpoint: 'http://127.0.0.1:4566', | |
region: 'us-east-1', | |
credentials: { | |
accessKeyId: 'test', | |
secretAccessKey: 'test' | |
} | |
}); | |
} | |
async function prepareTestBucket(bucketName = 'test-bucket') { | |
const { Buckets = [] } = await s3.send(new ListBucketsCommand({})) | |
if (Buckets.find(b => b.Name === bucketName)) { | |
await deleteBucket(bucketName); | |
} | |
await s3.send(new CreateBucketCommand({ Bucket: bucketName })); | |
console.log('Bucket %s created', bucketName); | |
return bucketName; | |
} | |
async function deleteBucket(bucketName: string) { | |
const now = performance.now(); | |
console.log('Bucket %s already exists, deleting...', bucketName); | |
const { Contents = [] } = await s3.send(new ListObjectsCommand({ Bucket: bucketName })); | |
await Promise.all(Contents.map(object => { | |
return s3.send(new DeleteObjectCommand({ Bucket: bucketName, Key: object.Key })); | |
})); | |
await s3.send(new DeleteBucketCommand({ Bucket: bucketName })); | |
console.log('Bucket %s deleted (%dms)', bucketName, duration(now)); | |
} | |
function duration(start: number) { | |
return Math.round((performance.now() - start) * 100) / 100; | |
} | |
} | |
main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment