Skip to content

Instantly share code, notes, and snippets.

@michaelsbradleyjr
Last active September 2, 2019 20:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michaelsbradleyjr/62d16f63a3ac318b42d8132fc178acda to your computer and use it in GitHub Desktop.
Save michaelsbradleyjr/62d16f63a3ac318b42d8132fc178acda to your computer and use it in GitHub Desktop.
Using IxJS batch with EventEmitter / RxJS Observable
/* global require setTimeout */
const {Subject} = require('rxjs');
const {tap: rxTap} = require('rxjs/operators');
const {AsyncIterableX: {from: ixFrom}} = require('ix/asynciterable');
const randomInt = (min, max) => Math.floor(Math.random() * (max + 1 - min)) + min;
const timer = ms => new Promise(resolve => setTimeout(resolve, ms));
const timedEffect = async (ms, effect) => { await timer(ms); effect(); };
const triangleNumber = n => n * (n + 1) / 2;
const numSends = 5;
const onceAround = triangleNumber(numSends);
const twiceAround = 2 * onceAround;
const minSendWait = 0;
const maxSendWait = minSendWait + 10;
// make minEffectWait greater than combined time needed for all sends so effects
// log only after all sends have logged (easier to visually compare the order)
const minEffectWait = numSends * maxSendWait + 100;
const maxEffectWait = minEffectWait + 100;
let _v = 0;
const asyncNext = async (sub$, v) => {
const wait = randomInt(minSendWait, maxSendWait);
await timer(wait);
console.log(`sending: ${v} (after ${wait} ms)`);
_v += v;
if (_v === onceAround || _v == twiceAround) console.log();
sub$.next(v);
};
let _resolve;
const rxFinished = new Promise(resolve => { _resolve = resolve; });
let __resolve;
const ixFinished = new Promise(resolve => { __resolve = resolve; });
let __v = 0;
const causeEffect = v => {
const wait = randomInt(minEffectWait, maxEffectWait);
// if causeEffect doesn't return a promise ix effect ordering will vary
return timedEffect(
wait,
() => {
console.log(`effect with: ${v} (after ${wait} ms)`);
__v += v;
if (__v === onceAround) _resolve();
if (__v === twiceAround) __resolve();
}
);
};
(async () => {
console.log('Rx\n----------------------------------------------------------');
let sub$ = new Subject();
let x = 0;
sub$.pipe(rxTap(causeEffect)).subscribe();
while (x < numSends) {
asyncNext(sub$, ++x);
}
await rxFinished;
console.log('\n^ send and effect order may be different\n');
console.log('Ix\n----------------------------------------------------------');
sub$ = new Subject();
x = 0;
ixFrom(sub$).forEach(causeEffect);
// ^ could use piped ops instead
// See :: https://github.com/ReactiveX/IxJS/pull/264
// would be: `ixFrom(sub$).pipe(ixTap(causeEffect)).forEach(() => {})`
// where `tap` is imported from 'ix/asynciterable/operators' as `ixTap`
// and `from` is imported from 'ix/asynciterable' as `ixFrom`
while (x < numSends) {
asyncNext(sub$, ++x);
}
await ixFinished;
console.log('\n^ send and effect order will *always* be the same if `causeEffect` returns a promise\n');
})();
/* global require setTimeout */
const {EventEmitter} = require('events');
const {inspect} = require('util');
const {fromEvent: rxFromEvent} = require('rxjs');
const {
AsyncIterableX: {from: ixFrom},
batch: ixBatch,
fromEvent: ixFromEvent
} = require('ix/asynciterable');
// -----------------------------------------------------------------------------
const emitter = new EventEmitter();
/* derive rx observable from emitter */
const foo$ = rxFromEvent(emitter, 'foo');
const batched = ixBatch(ixFrom(foo$));
/* alternatively derive async iterable directly from emitter */
// const batched = ixBatch(ixFromEvent(emitter, 'foo'));
const timer = ms => new Promise(resolve => setTimeout(resolve, ms));
batched.forEach(async batch => {
console.log(inspect(batch));
// the timer here represents something async like compiling
await timer(2000);
});
/* equivalent alternative to .forEach would be to process manually with JS
* built-in `for-await...of` syntax
* See :: https://devdocs.io/javascript/statements/for-await...of
*/
// (async () => {
// for await (const xs of batched) {
// console.log(inspect(xs));
// await timer(2000);
// }
// })();
(async () => {
let x = 0;
emitter.emit('foo', ++x);
emitter.emit('foo', ++x);
emitter.emit('foo', ++x);
await timer(100);
emitter.emit('foo', ++x);
emitter.emit('foo', ++x);
await timer(100);
emitter.emit('foo', ++x);
emitter.emit('foo', ++x);
emitter.emit('foo', ++x);
await timer(100);
emitter.emit('foo', ++x);
await timer(2000);
emitter.emit('foo', ++x);
emitter.emit('foo', ++x);
emitter.emit('foo', ++x);
})();
{
"name": "experiment",
"private": true,
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"ix": "^2.5.3",
"rxjs": "^6.5.2"
}
}
/* global require setImmediate setTimeout */
const {EventEmitter} = require('events');
const {inspect} = require('util');
const {fromEvent: rxFromEvent, interval: rxInterval} = require('rxjs');
const {
debounce: rxDebounce,
filter: rxFilter,
tap: rxTap
} = require('rxjs/operators');
const {
AsyncIterableX: {from: ixFrom},
batch: ixBatch
} = require('ix/asynciterable');
const {
debounce: ixDebounce,
tap: ixTap
} = require('ix/asynciterable/pipe/');
// -----------------------------------------------------------------------------
const emitter = new EventEmitter();
/* derive Rx observable from emitter */
const foo$ = rxFromEvent(emitter, 'foo');
const timer = ms => new Promise(resolve => setTimeout(resolve, ms));
ixBatch(
ixFrom(foo$.pipe(
// rxFilter(v => v % 2),
rxDebounce(() => rxInterval(50)),
rxTap(v => {
console.log('event:', inspect(v));
})
)).pipe(
// ixDebounce(50),
ixTap(v => {
console.log('from:', inspect(v));
})
)
).forEach(async v => {
console.log('batched:', inspect(v));
// the timer here represents something async like compiling
await timer(5000);
});
(async () => {
let x = 0;
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
await timer(1000);
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
await timer(2000);
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
await timer(1000);
setImmediate(() => emitter.emit('foo', ++x));
await timer(2000);
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
setImmediate(() => emitter.emit('foo', ++x));
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment