Skip to content

Instantly share code, notes, and snippets.

@rbrcurtis
Last active March 10, 2024 09:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rbrcurtis/e27cec8e924b9adaa7d0340e20cdb999 to your computer and use it in GitHub Desktop.
Save rbrcurtis/e27cec8e924b9adaa7d0340e20cdb999 to your computer and use it in GitHub Desktop.
stream postgres query via cursor into sequelize model
import { promisify } from 'bluebird'
import { getPostgresClient } from 'lib/connectToDatabase'
import { camelCase, set } from 'lodash'
import { Readable, Stream } from 'stream'
const Cursor = require('pg-cursor')
type ReadFunction = (count: number) => Promise<any[]>
export class QueryStreamer<T> extends Readable {
private constructor(
private cursor: { close: () => Promise<void> },
private reader: ReadFunction,
private ModelClass: new (...args: any[]) => T,
private batchSize = 100,
) {
super()
}
public static async getStream<T>(
query: string,
ModelClass: new (...args: any[]) => T,
batchSize = 100,
): Promise<Stream> {
let client = await getPostgresClient()
const cursor = client.query(new Cursor(query))
let read: ReadFunction = promisify(cursor.read.bind(cursor))
let stream = new QueryStreamer(cursor, read, ModelClass, batchSize)
return stream
}
_read(): this {
super.resume()
void Promise.resolve().then(async () => {
let rows: any
do {
if (this.destroyed || this.isPaused()) return
rows = await this.reader(this.batchSize)
rows?.forEach((row: any) =>
this.emit(
'data',
Object.entries(row).reduce(
(model, [key, value]) => set<T>(model as any, camelCase(key), value),
new this.ModelClass(),
),
),
)
} while (rows.length)
await this.cursor.close()
this.destroy()
})
return this
}
}
#!/usr/bin/env ts-node
import { QueryStreamer } from 'lib/QueryStreamer'
import { Movie } from 'models/Movie'
import { connectSequelize } from 'lib/connectToDatabase'
async function main(): Promise<void> {
await connectSequelize()
let stream = await QueryStreamer.getStream('select * from movies', Movie)
stream.on('data', (movie: Movie) => {
console.log('data', movie.toJSON())
})
}
void main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment