Last active
March 10, 2024 09:50
-
-
Save rbrcurtis/e27cec8e924b9adaa7d0340e20cdb999 to your computer and use it in GitHub Desktop.
stream postgres query via cursor into sequelize model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { promisify } from 'bluebird' | |
import { getPostgresClient } from 'lib/connectToDatabase' | |
import { camelCase, set } from 'lodash' | |
import { Readable, Stream } from 'stream' | |
const Cursor = require('pg-cursor') | |
type ReadFunction = (count: number) => Promise<any[]> | |
export class QueryStreamer<T> extends Readable { | |
private constructor( | |
private cursor: { close: () => Promise<void> }, | |
private reader: ReadFunction, | |
private ModelClass: new (...args: any[]) => T, | |
private batchSize = 100, | |
) { | |
super() | |
} | |
public static async getStream<T>( | |
query: string, | |
ModelClass: new (...args: any[]) => T, | |
batchSize = 100, | |
): Promise<Stream> { | |
let client = await getPostgresClient() | |
const cursor = client.query(new Cursor(query)) | |
let read: ReadFunction = promisify(cursor.read.bind(cursor)) | |
let stream = new QueryStreamer(cursor, read, ModelClass, batchSize) | |
return stream | |
} | |
_read(): this { | |
super.resume() | |
void Promise.resolve().then(async () => { | |
let rows: any | |
do { | |
if (this.destroyed || this.isPaused()) return | |
rows = await this.reader(this.batchSize) | |
rows?.forEach((row: any) => | |
this.emit( | |
'data', | |
Object.entries(row).reduce( | |
(model, [key, value]) => set<T>(model as any, camelCase(key), value), | |
new this.ModelClass(), | |
), | |
), | |
) | |
} while (rows.length) | |
await this.cursor.close() | |
this.destroy() | |
}) | |
return this | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ts-node | |
import { QueryStreamer } from 'lib/QueryStreamer' | |
import { Movie } from 'models/Movie' | |
import { connectSequelize } from 'lib/connectToDatabase' | |
async function main(): Promise<void> { | |
await connectSequelize() | |
let stream = await QueryStreamer.getStream('select * from movies', Movie) | |
stream.on('data', (movie: Movie) => { | |
console.log('data', movie.toJSON()) | |
}) | |
} | |
void main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment