Skip to content

Instantly share code, notes, and snippets.

@webstory
Last active February 25, 2018 14:13
Show Gist options
  • Save webstory/ec885b3879014431d8180031cd8dc497 to your computer and use it in GitHub Desktop.
Save webstory/ec885b3879014431d8180031cd8dc497 to your computer and use it in GitHub Desktop.
Backpressure - Controllable RxJS Producer
const rx = require('rx')
let s = new rx.Observable.from([1,2,3,4,5,6,7,8,9])
let ss = s.controlled()
ss.subscribe((x) => {
console.log("Pull "+x)
setTimeout(() => process(1, x), 100)
})
function process(i, x) {
if(i == 3) {
ss.request(1)
return
}
console.log('Process ${i} - ${x}')
setTimeout(() => process(i + 1, x), 100)
}
ss.request(1)
const rx = require('rx')
let s = new rx.Subject()
let ss = s.controlled()
ss.subscribe((x) => {
console.log("Pull "+x)
setTimeout(() => process(1, x), 100)
})
function process(i, x) {
if(i == 3) {
ss.request(1)
return
}
console.log(`Process ${i} - ${x}`)
setTimeout(() => process(i + 1, x), 100)
}
for(let i = 0; i < 100000; i++) {
s.onNext(i)
}
ss.request(1)
@webstory
Copy link
Author

webstory commented Feb 25, 2018

npm install rx
(Use rx package, not rxjs)

@webstory
Copy link
Author

webstory commented Feb 25, 2018

The first subscriber will be blocked until call ss.request(1), therefore it must called into the last callback chain.

@webstory
Copy link
Author

Why pausable() not work?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment