Skip to content

Instantly share code, notes, and snippets.

@dminkovsky
Last active August 3, 2022 14:36
Show Gist options
  • Save dminkovsky/cc7262cf211ecd0b11ba35739122aa68 to your computer and use it in GitHub Desktop.
Save dminkovsky/cc7262cf211ecd0b11ba35739122aa68 to your computer and use it in GitHub Desktop.
periodicallyEmittingBuffer
const { rxObserver } = require('api/v0.3');
const { zip, timer, from, Observable } = require('rxjs');
const { take } = require('rxjs/operators');
const TIMEOUT = 1;
const TAKE = 6;
const alphabet$ = from('abcdefghijklmnopqrstuvwxyz');
alphabet$.pipe(
take(TAKE)
)
.subscribe(rxObserver());
alphabet$.pipe(
periodicallyEmittingBuffer(TIMEOUT),
take(TAKE)
)
.subscribe(rxObserver());
function periodicallyEmittingBuffer(timeout) {
return (upstream) => {
const buffer = [];
return new Observable(s => {
// Buffer incoming items
upstream.subscribe({
next: (item) => buffer.push(item)
});
const maybeEmit = () => {
if (buffer.length) {
s.next(buffer.shift());
}
}
// Emit right away if something already buffered
maybeEmit();
const interval = setInterval(maybeEmit, timeout);
return () => clearInterval(interval);
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment