Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wellcaffeinated/f908094998edf54dc5840c8c3ad734d3 to your computer and use it in GitHub Desktop.
Save wellcaffeinated/f908094998edf54dc5840c8c3ad734d3 to your computer and use it in GitHub Desktop.
Rx.Observable that can sequentially pull data from a mongoose (mongodb) cursor with fine control over concurrency
const Rx = require( 'rxjs/Rx' );
// This took me way too long to figure out. Hope this helps someone.
// <3 Well Caffeinated
function fromCursor( cursor ){
return new Rx.Observable((obs) => {
// is the connection closed
var closed = false
// get the next document
function getNext(){
return cursor.next((err, doc) => {
if ( err ){
return obs.error( err )
}
if ( !doc ){
// no document so we're done
return obs.complete()
}
// call next, however we'll pass it an observable
// that way we delay fetching the next document until
// the current one is observed
obs.next(Rx.Observable.defer( () => {
if ( !closed ){ getNext() }
return Rx.Observable.of(doc)
}))
})
}
// start
getNext()
// cleanup
return () => {
closed = true;
cursor.close();
}
})
}
const Model = require('some/mongoose/model')
const taskFn = require('some/task/fn')
function performMaintenance( concurrency = 1 ){
let query = Model.find({ needsMaintenance: true })
// this will return an observable stream of the result of taskFn
return fromCursor( query.cursor() ).mergeMap( obs => {
// we use mergeMap because we want to perform async tasks on each document
// we need to use flatMap because we're receiving an observable, not the document!
// taskFn could return an observable, or a promise
return obs.flatMap( doc => taskFn(doc) )
}, concurrency)
}
// do stuff
performMaintenance( 2 ).subscribe({
next: ( result ) => console.log('updated document', result)
, error: ( err ) => console.error( err )
, complete: () => console.log('done')
})
@midzdotdev
Copy link

I thought I'd share my version which I'm using in my project.

export function cursor$ <T> (cursor: Cursor<T>): Observable<T> {

  const next$ = () => from(cursor.hasNext())
    .pipe(
      concatMap(x => x ? from(<Promise<T>> cursor.next()) : empty()),
    )
  
  return next$().pipe(expand(() => next$()))
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment