Skip to content

Instantly share code, notes, and snippets.

@RecuencoJones
Last active November 3, 2018 14:53
Show Gist options
  • Save RecuencoJones/e3db146d2b08f025ff7d0382a90bbbe3 to your computer and use it in GitHub Desktop.
Save RecuencoJones/e3db146d2b08f025ff7d0382a90bbbe3 to your computer and use it in GitHub Desktop.
Use a workerpool to consume SWAPI with RxJS
const { pool } = require('workerpool')
const { Observable } = require('rxjs')
require('rxjs/add/observable/fromPromise')
require('rxjs/add/observable/empty')
require('rxjs/add/observable/forkJoin')
require('rxjs/add/operator/expand')
const { fromPromise, empty, forkJoin } = Observable
const start = 'https://swapi.co/api/people'
const workers = pool({
minWorkers: 2,
maxWorkers: 6
})
function getJson(url) {
const fetch = require('node-fetch')
return fetch(url).then((res) => res.json())
}
function main() {
// get characters page
return fromPromise(getJson(start))
// check whether we need to recurse and retrieve next page
.expand(({ next }) => next ? fromPromise(getJson(next)) : empty())
.flatMap(({ results }) => forkJoin(
// for each character in page, offload retrieving its details to worker
// emit once all tasks from current page are resolved by the pool
results.map(({ url }) => fromPromise(workers.exec(getJson, [ url ])))
))
}
console.log('Start', workers.stats())
main()
.forEach((data) => {
console.log(`Batch finished: ${ data.length }`, workers.stats())
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment