Skip to content

Instantly share code, notes, and snippets.

@petrosagg
Created July 14, 2021 15:36
Show Gist options
  • Save petrosagg/804e5f009dee1cb8af688654ba396258 to your computer and use it in GitHub Desktop.
Save petrosagg/804e5f009dee1cb8af688654ba396258 to your computer and use it in GitHub Desktop.
Consuming a stream with pg-cursor
import { Readable } from 'stream'
import pg from 'pg'
import Cursor from 'pg-cursor'
class TailStream extends Readable {
constructor(client, text, values) {
super({
highWaterMark: 5,
objectMode: true
});
this.client = client;
this.cursor = client.query(new Cursor(text, values));
this.pendingReads = 0;
this.cursor.on('row', (row) => {
this.pendingReads -= 1;
this.push(row);
});
}
_read(n) {
if (this.pendingReads == 0) {
this.pendingReads = n;
console.log('reading the next batch of', n, 'records');
this.cursor.read(n, (err) => {
if (err) {
this.destroy(err);
}
});
}
}
_destroy(err, callback) {
this.cursor.close(() => {
this.client.release()
callback()
})
}
}
async function main() {
const client = new pg.Client('postgres://materialize@localhost:6875/materialize');
await client.connect();
const stream = new TailStream(client, "TAIL foo", []);
for await (const row of stream) {
console.log('tail row', row);
}
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment