Skip to content

Instantly share code, notes, and snippets.

@superherointj
Created June 8, 2016 20:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save superherointj/d53886ac8cece5c689f25d4f218dea93 to your computer and use it in GitHub Desktop.
Save superherointj/d53886ac8cece5c689f25d4f218dea93 to your computer and use it in GitHub Desktop.
Take 2 items per time and execute a list of synchronous operation. 2 * 6 * 4
const Rx = require('rxjs');
// This function is expected to be executed serially. One item per time. So it should take 6 seconds.
function processItem(item){
let queueTransformed = [
Rx.Observable.timer(1000).do(x=>console.log('1',item)).map(x=>'1'+item),
Rx.Observable.timer(1000).do(x=>console.log('2',item)).map(x=>'2'+item),
Rx.Observable.timer(1000).do(x=>console.log('3',item)).map(x=>'3'+item),
Rx.Observable.timer(1000).do(x=>console.log('4',item)).map(x=>'4'+item),
Rx.Observable.timer(1000).do(x=>console.log('5',item)).map(x=>'5'+item),
Rx.Observable.timer(1000).do(x=>console.log('6',item)).map(x=>'6'+item)
]
return Rx.Observable.from(queueTransformed).concatAll()
}
//'arr' items should be processed two per time. It should take 4 iterations to finish this array.
let arr = ['a','b','c','d','e','f','g'];
Rx.Observable.from(arr)
.windowCount(2) // Splits in different observable.
.map(win => { // Per observable item do processItem.
return win.mergeMap(item => processItem(item))
})
.concatAll() // Wait for each Window Observable to finish before taking next.
.subscribe(x=>console.log('Z-nxt =>',x), err=>console.log('Z-err =>',err), ()=>console.log('Z-end.'));
/* === CURRENT OUTPUT ===
c:\dev\_sandbox\rxwindow>node app.js
1 a
Znxt => 1a
1 b
Znxt => 1b
2 a
Znxt => 2a
2 b
Znxt => 2b
3 a
Znxt => 3a
3 b
Znxt => 3b
4 a
Znxt => 4a
4 b
Znxt => 4b
5 a
Znxt => 5a
5 b
Znxt => 5b
6 a
Znxt => 6a
6 b
Znxt => 6b
C:\dev\_sandbox\rxwindow\node_modules\rxjs\scheduler\QueueScheduler.js:24
throw action.error;
^
ObjectUnsubscribedError:
c:\dev\_sandbox\rxwindow>
--------------------------------
==> The desirable behaviour would be instead of throwing for 'ObjectUnsubscribedError' taking next set of items (c,d) and keep doing same operation.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment