Last active
September 2, 2019 20:37
-
-
Save michaelsbradleyjr/62d16f63a3ac318b42d8132fc178acda to your computer and use it in GitHub Desktop.
Using IxJS batch with EventEmitter / RxJS Observable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* global require setTimeout */ | |
const {Subject} = require('rxjs'); | |
const {tap: rxTap} = require('rxjs/operators'); | |
const {AsyncIterableX: {from: ixFrom}} = require('ix/asynciterable'); | |
const randomInt = (min, max) => Math.floor(Math.random() * (max + 1 - min)) + min; | |
const timer = ms => new Promise(resolve => setTimeout(resolve, ms)); | |
const timedEffect = async (ms, effect) => { await timer(ms); effect(); }; | |
const triangleNumber = n => n * (n + 1) / 2; | |
const numSends = 5; | |
const onceAround = triangleNumber(numSends); | |
const twiceAround = 2 * onceAround; | |
const minSendWait = 0; | |
const maxSendWait = minSendWait + 10; | |
// make minEffectWait greater than combined time needed for all sends so effects | |
// log only after all sends have logged (easier to visually compare the order) | |
const minEffectWait = numSends * maxSendWait + 100; | |
const maxEffectWait = minEffectWait + 100; | |
let _v = 0; | |
const asyncNext = async (sub$, v) => { | |
const wait = randomInt(minSendWait, maxSendWait); | |
await timer(wait); | |
console.log(`sending: ${v} (after ${wait} ms)`); | |
_v += v; | |
if (_v === onceAround || _v == twiceAround) console.log(); | |
sub$.next(v); | |
}; | |
let _resolve; | |
const rxFinished = new Promise(resolve => { _resolve = resolve; }); | |
let __resolve; | |
const ixFinished = new Promise(resolve => { __resolve = resolve; }); | |
let __v = 0; | |
const causeEffect = v => { | |
const wait = randomInt(minEffectWait, maxEffectWait); | |
// if causeEffect doesn't return a promise ix effect ordering will vary | |
return timedEffect( | |
wait, | |
() => { | |
console.log(`effect with: ${v} (after ${wait} ms)`); | |
__v += v; | |
if (__v === onceAround) _resolve(); | |
if (__v === twiceAround) __resolve(); | |
} | |
); | |
}; | |
(async () => { | |
console.log('Rx\n----------------------------------------------------------'); | |
let sub$ = new Subject(); | |
let x = 0; | |
sub$.pipe(rxTap(causeEffect)).subscribe(); | |
while (x < numSends) { | |
asyncNext(sub$, ++x); | |
} | |
await rxFinished; | |
console.log('\n^ send and effect order may be different\n'); | |
console.log('Ix\n----------------------------------------------------------'); | |
sub$ = new Subject(); | |
x = 0; | |
ixFrom(sub$).forEach(causeEffect); | |
// ^ could use piped ops instead | |
// See :: https://github.com/ReactiveX/IxJS/pull/264 | |
// would be: `ixFrom(sub$).pipe(ixTap(causeEffect)).forEach(() => {})` | |
// where `tap` is imported from 'ix/asynciterable/operators' as `ixTap` | |
// and `from` is imported from 'ix/asynciterable' as `ixFrom` | |
while (x < numSends) { | |
asyncNext(sub$, ++x); | |
} | |
await ixFinished; | |
console.log('\n^ send and effect order will *always* be the same if `causeEffect` returns a promise\n'); | |
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* global require setTimeout */ | |
const {EventEmitter} = require('events'); | |
const {inspect} = require('util'); | |
const {fromEvent: rxFromEvent} = require('rxjs'); | |
const { | |
AsyncIterableX: {from: ixFrom}, | |
batch: ixBatch, | |
fromEvent: ixFromEvent | |
} = require('ix/asynciterable'); | |
// ----------------------------------------------------------------------------- | |
const emitter = new EventEmitter(); | |
/* derive rx observable from emitter */ | |
const foo$ = rxFromEvent(emitter, 'foo'); | |
const batched = ixBatch(ixFrom(foo$)); | |
/* alternatively derive async iterable directly from emitter */ | |
// const batched = ixBatch(ixFromEvent(emitter, 'foo')); | |
const timer = ms => new Promise(resolve => setTimeout(resolve, ms)); | |
batched.forEach(async batch => { | |
console.log(inspect(batch)); | |
// the timer here represents something async like compiling | |
await timer(2000); | |
}); | |
/* equivalent alternative to .forEach would be to process manually with JS | |
* built-in `for-await...of` syntax | |
* See :: https://devdocs.io/javascript/statements/for-await...of | |
*/ | |
// (async () => { | |
// for await (const xs of batched) { | |
// console.log(inspect(xs)); | |
// await timer(2000); | |
// } | |
// })(); | |
(async () => { | |
let x = 0; | |
emitter.emit('foo', ++x); | |
emitter.emit('foo', ++x); | |
emitter.emit('foo', ++x); | |
await timer(100); | |
emitter.emit('foo', ++x); | |
emitter.emit('foo', ++x); | |
await timer(100); | |
emitter.emit('foo', ++x); | |
emitter.emit('foo', ++x); | |
emitter.emit('foo', ++x); | |
await timer(100); | |
emitter.emit('foo', ++x); | |
await timer(2000); | |
emitter.emit('foo', ++x); | |
emitter.emit('foo', ++x); | |
emitter.emit('foo', ++x); | |
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "experiment", | |
"private": true, | |
"version": "1.0.0", | |
"description": "", | |
"main": "index.js", | |
"scripts": { | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"keywords": [], | |
"author": "", | |
"license": "ISC", | |
"dependencies": { | |
"ix": "^2.5.3", | |
"rxjs": "^6.5.2" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* global require setImmediate setTimeout */ | |
const {EventEmitter} = require('events'); | |
const {inspect} = require('util'); | |
const {fromEvent: rxFromEvent, interval: rxInterval} = require('rxjs'); | |
const { | |
debounce: rxDebounce, | |
filter: rxFilter, | |
tap: rxTap | |
} = require('rxjs/operators'); | |
const { | |
AsyncIterableX: {from: ixFrom}, | |
batch: ixBatch | |
} = require('ix/asynciterable'); | |
const { | |
debounce: ixDebounce, | |
tap: ixTap | |
} = require('ix/asynciterable/pipe/'); | |
// ----------------------------------------------------------------------------- | |
const emitter = new EventEmitter(); | |
/* derive Rx observable from emitter */ | |
const foo$ = rxFromEvent(emitter, 'foo'); | |
const timer = ms => new Promise(resolve => setTimeout(resolve, ms)); | |
ixBatch( | |
ixFrom(foo$.pipe( | |
// rxFilter(v => v % 2), | |
rxDebounce(() => rxInterval(50)), | |
rxTap(v => { | |
console.log('event:', inspect(v)); | |
}) | |
)).pipe( | |
// ixDebounce(50), | |
ixTap(v => { | |
console.log('from:', inspect(v)); | |
}) | |
) | |
).forEach(async v => { | |
console.log('batched:', inspect(v)); | |
// the timer here represents something async like compiling | |
await timer(5000); | |
}); | |
(async () => { | |
let x = 0; | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(1000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(2000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(1000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(2000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment