Last active
May 2, 2020 10:52
-
-
Save ChenYingChou/849ecbcfa597bff3c972131324d35f1a to your computer and use it in GitHub Desktop.
Node JS : Make pg-cursor easier to use
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// vim: set ts=4 sw=4 sts=4 et cindent: | |
const pg = require('pg') | |
const Pool = pg.Pool | |
const Cursor = require('pg-cursor') | |
//@ts-check | |
class PgCursors { | |
/** | |
* @param {pg.Pool} db | |
*/ | |
constructor (db) { | |
this._db = db | |
/** @type {Map<pg.PoolClient, Cursor>} */ | |
this._cds = new Map() | |
} | |
close() { | |
for (const [client, cursor] of this._cds) { | |
cursor.close() | |
client.release() | |
} | |
this._cds.clear() | |
} | |
/** | |
* @param {string} sql - SQL statement | |
* @param {any[]} [params] - Parameters for `sql` | |
* @param {number} [count] - Batch count for each read, default is `1000` | |
* @param {boolean} [isBatch] - Yield batch rows instead of each row | |
*/ | |
async * generator(sql, params, count, isBatch) { | |
if (typeof count === 'boolean') [isBatch, count] = [count, isBatch] | |
if (!count) count = 1000 | |
const client = await this._db.connect() | |
const cursor = client.query(new Cursor(sql, params)) | |
const read = () => { | |
return new Promise((resolve, reject) => { | |
cursor.read(count, (err, rows) => { | |
if (err) return reject(err) | |
resolve(rows) | |
}) | |
}) | |
} | |
this._cds.set(client, cursor) | |
try { | |
while (true) { | |
const rows = await read() | |
if (rows.length === 0) break | |
if (isBatch) { | |
yield rows | |
} else { | |
for (const row of rows) { | |
yield row | |
} | |
} | |
} | |
} catch (e) { | |
// ignore | |
} | |
this._cds.delete(client) | |
client.release() | |
} | |
/** | |
* @param {string} sql - SQL statement | |
* @param {any[]} params - Parameters for `sql` | |
* @param {number|boolean} [count] - Batch count for each read, default is `1000` | |
*/ | |
batch(sql, params, count) { | |
return this.generator(sql, params, count, true) | |
} | |
} | |
async function main(num, isBatch) { | |
const pool = new Pool() | |
const cursors = new PgCursors(pool) | |
try { | |
const sql = 'SELECT email, password FROM account WHERE email LIKE $1 ESCAPE \'!\'' | |
const values = ['abc%@example.com'] | |
const gen = cursors.generator(sql, values, num, isBatch) | |
let count = 0 | |
const print = (row) => { | |
count++ | |
if (count % 1000 === 0) | |
console.log(`count: ${count}, row:`, row) | |
} | |
for await (const data of gen) { | |
if (isBatch) { | |
for (const row of data) { | |
print(row) | |
} | |
} else { | |
print(data) | |
} | |
if (count > 5000) gen.throw('cancel').catch(e => e) | |
} | |
console.log('Total count:', count) | |
} catch (error) { | |
console.error(error) | |
} | |
cursors.close() | |
pool.end() | |
} | |
main(process.argv[2], !!Number(process.argv[3])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment