Skip to content

Instantly share code, notes, and snippets.

@ezk84
Created October 23, 2020 14:44
Show Gist options
  • Save ezk84/46f548d02e7b5de09393f741298834ec to your computer and use it in GitHub Desktop.
Save ezk84/46f548d02e7b5de09393f741298834ec to your computer and use it in GitHub Desktop.
Testing pg-promise query streaming with dependent secondary query for every streamed row
#!/usr/bin/env node
const pgPromise = require("pg-promise");
const Bluebird = require("bluebird");
const QueryStream = require("pg-query-stream");
const pgpInit = { promiseLib: Bluebird, capSQL: true };
const pgp = pgPromise(pgpInit);
const db = pgp({
host: process.env.POSTGRES_HOST,
port: Number.parseInt(process.env.POSTGRES_PORT || "5432"),
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD,
database: process.env.POSTGRES_DB,
});
// const monitor = require("pg-monitor");
// monitor.attach(pgpInit);
main().catch(console.error);
async function main() {
try {
await db.tx("test-tx", async tx => {
const qs = new QueryStream(`SELECT s.a AS i FROM GENERATE_SERIES(1, 100) AS s(a)`);
// await spexReadChunks(tx, qs);
// await spexReadBatch(tx, qs);
// await spexReadSequence(tx, qs);
await streamOnData(tx, qs);
// await asyncIteration(tx, qs);
});
console.log("transaction done");
} catch (error) {
console.error(error);
} finally {
db.$pool.end();
}
}
/** @param {pgPromise.ITask} tx */
async function spexReadChunks(tx, qs) {
let count = 0;
const startTime = new Date();
const result = await tx.stream(qs, stream => {
pgp.spex.stream.read(
stream,
async (i, data) => {
// console.log(`handling ${i}: ${JSON.stringify(data)}`);
await innerQuery(tx, count++, startTime);
},
{ readChunks: true }
).then(r => console.log("read done", r));
});
console.log("stream done", result);
}
/** @param {pgPromise.ITask} tx */
async function spexReadBatch(tx, qs) {
let count = 0;
const startTime = new Date();
const result = await tx.stream(qs, stream => {
return pgp.spex.stream.read(
stream,
async (i, data) => {
console.log(`handling ${i}: ${JSON.stringify(data)}`);
return tx.batch(data.map(() => innerQuery(tx, count++, startTime)));
}
);
});
console.log("stream done", result);
}
/** @param {pgPromise.ITask} tx */
async function spexReadSequence(tx, qs) {
let count = 0;
const startTime = new Date();
const result = await tx.stream(qs, stream => {
return pgp.spex.stream.read(
stream,
async (i, data) => {
return tx.sequence(async j => {
// console.log(`handling ${i} ${j}: ${JSON.stringify(data)}`);
await innerQuery(tx, count++, startTime);
});
},
{ readChunks: true }
);
});
console.log("stream done", result);
}
/** @param {pgPromise.ITask} tx */
async function streamOnData(tx, qs) {
let count = 0;
const startTime = new Date();
const result = await tx.stream(qs, async stream => {
stream.on("data", async data => {
try {
// console.log(`handling ${JSON.stringify(data)}`);
stream.pause();
await innerQuery(tx, count++, startTime);
} catch (error) {
console.error(error);
} finally {
stream.resume();
}
});
});
console.log("stream done", result);
}
/** @param {pgPromise.ITask} tx */
async function asyncIteration(tx, qs) {
let count = 0;
const startTime = new Date();
const result = await tx.stream(qs, async stream => {
for await (const data of stream) {
console.log(`handling ${JSON.stringify(data)}`);
await innerQuery(tx, count++, startTime);
}
console.log("done iterating");
stream.destroy();
});
console.log("stream done", result);
}
async function innerQuery(tx, count, startTime) {
if (count % 10000 === 0) {
const duration = Math.round((new Date() - startTime) / 1000);
const mb = Math.round(process.memoryUsage().heapUsed / 1024 / 1024);
console.log(`row ${count}, ${mb}MB, ${duration} seconds`);
}
await tx.one(`SELECT ${count}`);
console.log(`inner query ${count} done`);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment