Skip to content

Instantly share code, notes, and snippets.

@ChenYingChou
Last active May 2, 2020 10:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChenYingChou/849ecbcfa597bff3c972131324d35f1a to your computer and use it in GitHub Desktop.
Save ChenYingChou/849ecbcfa597bff3c972131324d35f1a to your computer and use it in GitHub Desktop.
Node JS : Make pg-cursor easier to use
// vim: set ts=4 sw=4 sts=4 et cindent:
const pg = require('pg')
const Pool = pg.Pool
const Cursor = require('pg-cursor')
//@ts-check
class PgCursors {
/**
* @param {pg.Pool} db
*/
constructor (db) {
this._db = db
/** @type {Map<pg.PoolClient, Cursor>} */
this._cds = new Map()
}
close() {
for (const [client, cursor] of this._cds) {
cursor.close()
client.release()
}
this._cds.clear()
}
/**
* @param {string} sql - SQL statement
* @param {any[]} [params] - Parameters for `sql`
* @param {number} [count] - Batch count for each read, default is `1000`
* @param {boolean} [isBatch] - Yield batch rows instead of each row
*/
async * generator(sql, params, count, isBatch) {
if (typeof count === 'boolean') [isBatch, count] = [count, isBatch]
if (!count) count = 1000
const client = await this._db.connect()
const cursor = client.query(new Cursor(sql, params))
const read = () => {
return new Promise((resolve, reject) => {
cursor.read(count, (err, rows) => {
if (err) return reject(err)
resolve(rows)
})
})
}
this._cds.set(client, cursor)
try {
while (true) {
const rows = await read()
if (rows.length === 0) break
if (isBatch) {
yield rows
} else {
for (const row of rows) {
yield row
}
}
}
} catch (e) {
// ignore
}
this._cds.delete(client)
client.release()
}
/**
* @param {string} sql - SQL statement
* @param {any[]} params - Parameters for `sql`
* @param {number|boolean} [count] - Batch count for each read, default is `1000`
*/
batch(sql, params, count) {
return this.generator(sql, params, count, true)
}
}
async function main(num, isBatch) {
const pool = new Pool()
const cursors = new PgCursors(pool)
try {
const sql = 'SELECT email, password FROM account WHERE email LIKE $1 ESCAPE \'!\''
const values = ['abc%@example.com']
const gen = cursors.generator(sql, values, num, isBatch)
let count = 0
const print = (row) => {
count++
if (count % 1000 === 0)
console.log(`count: ${count}, row:`, row)
}
for await (const data of gen) {
if (isBatch) {
for (const row of data) {
print(row)
}
} else {
print(data)
}
if (count > 5000) gen.throw('cancel').catch(e => e)
}
console.log('Total count:', count)
} catch (error) {
console.error(error)
}
cursors.close()
pool.end()
}
main(process.argv[2], !!Number(process.argv[3]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment