Skip to content

Instantly share code, notes, and snippets.

@ybouhjira
Last active October 25, 2016 01:42
Show Gist options
  • Save ybouhjira/0d6061af168005793ed6517f1aaf2c47 to your computer and use it in GitHub Desktop.
Save ybouhjira/0d6061af168005793ed6517f1aaf2c47 to your computer and use it in GitHub Desktop.
Distribute stream on an other stream
console.clear();
const items$ = Rx.Observable
.interval(100)
.map(x => `item-${x}`)
.take(100)
.share();
const cc$ = Rx.Observable
.interval(0)
.take(2)
.scan(count => count + 1, 0)
.share();
const zip$ = Rx.Observable
.zip(
items$,
items$.scan(count => count + 1, 0)
);
Rx.Observable
.combineLatest(zip$, cc$, (z, cc) => {
return [z[0], z[1], cc];
})
//.subscribe(x => console.log(x));
.groupBy(([i, c, cc]) => c % cc)
.flatMap(x => x)
.subscribe(x => console.log(x));
@ybouhjira
Copy link
Author

console.clear();

document.body.innerHTML += "<p style='width: 400px' id='div'></p>"
let div = document.querySelector('#div');

function rxDistribute(items$, clients$) {
  const cc$ = clients$
    .scan(count => count + 1, 0)
    .share();

  const zip$ = Rx.Observable.zip(
    items$, 
    items$.scan(count => count + 1, 0)
  );

  return Rx.Observable
    .combineLatest(zip$, cc$, ([item, index], count) => 
        [index, index, count]
    )
    .groupBy(
      ([i, c, cc]) => c % cc,
      ([item, index, count]) => [item, index % count]
    )
    .flatMap(x => x)
    .flatMap(([item, client]) => Rx.Observable.combineLatest(
      Rx.Observable.of(item),
      clients$.elementAt(client),
      (item, clientItem) => [item, clientItem]
    ));
}

const items$ = Rx.Observable
  .interval(100)
  .map(x => `item-${x}`)
  .take(16)
  .share();

const clients$ = Rx.Observable
  .interval(500)
  .map(client => ({client}))
  .take(5);

rxDistribute(items$, clients$)
    .filter(([_, c]) => c.client == 3)
    .scan(count => count + 1, 0)
    .subscribe(x => console.log(x));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment