Skip to content

Instantly share code, notes, and snippets.

@michaelsbradleyjr
Last active September 2, 2019 20:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michaelsbradleyjr/5abf097e9c2b99d101003d6c2cafb1ed to your computer and use it in GitHub Desktop.
Save michaelsbradleyjr/5abf097e9c2b99d101003d6c2cafb1ed to your computer and use it in GitHub Desktop.
/* global require setImmediate setTimeout */
const {EventEmitter} = require('events');
const {inspect} = require('util');
const {fromEvent: rxFromEvent, interval: rxInterval} = require('rxjs');
const {debounce: rxDebounce, filter: rxFilter} = require('rxjs/operators');
const {
AsyncIterableX: {from: ixFrom},
batch: ixBatch
} = require('ix/asynciterable');
const {
debounce: ixDebounce,
map: ixMap,
tap: ixTap
} = require('ix/asynciterable/pipe/');
// -----------------------------------------------------------------------------
const emitter = new EventEmitter();
/* derive Rx observable from emitter */
const foo$ = rxFromEvent(emitter, 'foo');
const timer = ms => new Promise(resolve => setTimeout(resolve, ms));
ixBatch(
ixFrom(foo$.pipe(
// rxFilter(v => v % 2),
rxDebounce(() => rxInterval(50))
)).pipe(
// ixDebounce(50),
ixTap(v => {
console.log('from:', inspect(v));
})
)
).forEach(async v => {
console.log('batched:', inspect(v));
// the timer here represents something async like compiling
await timer(5000);
});
(async () => {
let x = 0;
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
await timer(1000);
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
await timer(2000);
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
await timer(1000);
setImmediate(() => emitter.emit('foo', ++x));
await timer(2000);
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment