Skip to content

Instantly share code, notes, and snippets.

@RecuencoJones
Last active November 1, 2018 22:10
Show Gist options
  • Save RecuencoJones/de8a11cf12aedcb7230a027405b80778 to your computer and use it in GitHub Desktop.
Save RecuencoJones/de8a11cf12aedcb7230a027405b80778 to your computer and use it in GitHub Desktop.
RxJS pagination
const { create, of, fromPromise } = Rx.Observable
////// main //////
const charactersUrl = 'https://swapi.co/api/people'
const getAllCharacters = () => getPagedResource(charactersUrl)
// outputs all characters at once
getAllCharacters()
.subscribe((vals) => {
console.log('blocking', vals)
})
////// helpers //////
// fetch pagination blocking
function getPagedResource(startUrl) {
return create((observer) => {
let values = []
const obs$ = get(startUrl)
.expand(({ next }) => next ? get(next) : of({}))
.map(({ results }) => results)
.catch((err) => {
observer.error(err)
})
.subscribe((data) => {
if (data) {
values = values.concat(data)
} else {
obs$.unsubscribe()
observer.next(values)
observer.complete()
}
})
})
}
// fetch json as observable
function get(url) {
return fromPromise(fetch(url).then((res) => res.json()))
}
const { create, empty, of, fromPromise } = Rx.Observable
////// main //////
const charactersUrl = 'https://swapi.co/api/people'
const getAllCharacters = ({ blocking } = {}) => getPagedResource({
startUrl: charactersUrl, blocking
})
// outputs chunks of characters as they arrive
getAllCharacters()
.subscribe((vals) => {
console.log('stream strat', vals)
})
// outputs all characters at once
getAllCharacters({ blocking: true })
.subscribe((vals) => {
console.log('blocking strat', vals)
})
////// helpers //////
// fetch pagination based on strategy
function getPagedResource({ startUrl, blocking }) {
return selectReactiveStrategy(blocking)(
get(startUrl)
.expand(({ next }) => next ? get(next) : blocking ? of({}) : empty())
.map(({ results }) => results)
)
}
function selectReactiveStrategy(blocking) {
return (obs$) => !blocking ? obs$ : create((observer) => {
let values = []
const unsub$ = obs$
.catch((err) => {
observer.error(err)
})
.subscribe((data) => {
if (data) {
values = values.concat(data)
} else {
unsub$.unsubscribe()
observer.next(values)
observer.complete()
}
})
})
}
// fetch json as observable
function get(url) {
return fromPromise(fetch(url).then((res) => res.json()))
}
const { empty, fromPromise } = Rx.Observable
////// main //////
const charactersUrl = 'https://swapi.co/api/people'
const getAllCharacters = () => getPagedResource(charactersUrl)
// outputs chunks of characters as they arrive
getAllCharacters()
.subscribe((vals) => {
console.log('stream strategy', vals)
})
////// helpers //////
// fetch pagination stream
function getPagedResource(startUrl) {
return get(startUrl)
.expand(({ next }) => next ? get(next) : empty())
.map(({ results }) => results)
}
// fetch json as observable
function get(url) {
return fromPromise(fetch(url).then((res) => res.json()))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment