Skip to content

Instantly share code, notes, and snippets.

@rmela
Last active December 22, 2023 17:51
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmela/a3bed669ad6194fb2d9670789541b0c7 to your computer and use it in GitHub Desktop.
Save rmela/a3bed669ad6194fb2d9670789541b0c7 to your computer and use it in GitHub Desktop.
SQLite query results as nodejs readable stream
echo 'create table foo ( name string, value int )' | sqlite3 foo.db
for idx in {0..1000}
do
echo "insert into foo( name, value ) values( $idx, 'abc${idx}' );"
done | sqlite3 foo.db
/*
*
* Return SQLite3 query results as a Nodejs stream, sensitive to backpressure.
*
* Assumes a foo.db file with a lot of rows - I used 1000 with a number & a string
*/
const stream = require('stream');
const sqlite = require('sqlite3');
class DBStream extends stream.Readable {
constructor( opts ) {
super( { objectMode: true } );
this.sql = opts.sql;
this.db = new sqlite.Database( opts.db );
this.stmt = this.db.prepare( this.sql );
this.on( 'end', () => this.stmt.finalize( () => this.db.close() ));
}
_read() {
let strm = this;
this.stmt.get( function(err,result) {
// If result is undefined, push null, which will end the stream.
/*
* Should have no backpressure problems,
* since _read is only called when the downstream is
* ready to fetch data
*/
err ?
strm.emit('error', err ) :
strm.push( result || null);
})
}
}
/*
* simple test
*/
stream.pipeline(
[
new DBStream( { sql:'select * from foo' } ),
new stream.Transform( { objectMode: true, transform:( data, enc, cb ) => cb( null, JSON.stringify( data ) ) } ),
process.stdout
],
err => { err && console.error(err); process.exit(0) }
)
@mohammed-ali-1
Copy link

Thank you so much! I needed to stream a result to a CSV file. This does the job 👌🏻

@kirillgroshkov
Copy link

Thanks a lot! I took inspiration from your snipped and created my version of it:

import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { Database, Statement } from 'sqlite'
import { Readable } from 'stream'

/**
 * Based on: https://gist.github.com/rmela/a3bed669ad6194fb2d9670789541b0c7
 */
export class SqliteReadable<T = any> extends Readable implements ReadableTyped<T> {
  constructor(private stmt: Statement) {
    super( { objectMode: true } );

    // might be unnecessary
    // this.on( 'end', () => {
    //   console.log(`SQLiteStream end`)
    //   void this.stmt.finalize()
    // })
  }

  static async create<T = any>(db: Database, sql: string): Promise<SqliteReadable<T>> {
    const stmt = await db.prepare(sql)
    return new SqliteReadable<T>(stmt)
  }

  /**
   * Necessary to call it, otherwise this error might occur on `db.close()`:
   * SQLITE_BUSY: unable to close due to unfinalized statements or unfinished backups
   */
  async close(): Promise<void> {
    await this.stmt.finalize()
  }

  // count = 0 // use for debugging

  override async _read(): Promise<void> {
    // console.log(`read ${++this.count}`) // debugging
    try {
      const r = await this.stmt.get<T>()
      this.push(r || null)
    } catch(err) {
      console.log(err) // todo: check if it's necessary
      this.emit('error', err)
    }
  }

}

@rmela
Copy link
Author

rmela commented Aug 21, 2021 via email

@wil92
Copy link

wil92 commented Apr 14, 2022

good stuff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment