Instantly share code, notes, and snippets.

Embed
What would you like to do?
RxJS: Stream Completion
// An array stream is a stream that completes
Rx.Observable.fromArray([1,2,3,4])
.subscribe(
function next(input) {
console.log('Received event data', input)
},
function error(err) {
consle.log('Log error', err)
},
function complete() {
console.log('Observable just expired.')
}
)
/**
* Event streams however, never complete.
* In order to complete an event stream, something like takeUntil can be used.
*/
const mouseMoveListener = Rx.Observable.fromEvent(document, 'mousemove')
.subscribe(
function next(input) {
console.log('Received event')
},
function error(err) {
console.log('Log error', err)
},
function () {
// Note: that this will never be called. this is unreachable code.
console.log('Mouse move subscription just expired.')
}
)
/**
* The following code ensures that after
* Please note: that the text 'Mouse move subscription just expired.' will never be
* printed in the console. This is because unsubscribing and completion of stream are
* two very different events.
*/
setTimeout(function unsubscribe() {
console.log('Unsubscribing mouse move.')
mouseMoveListener.unsubscribe()
}, 5000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment