Skip to content

Instantly share code, notes, and snippets.

@michaelsbradleyjr
Last active May 5, 2024 20:48
Show Gist options
  • Save michaelsbradleyjr/b473c6099399eec5fdcedf016fe30b5a to your computer and use it in GitHub Desktop.
Save michaelsbradleyjr/b473c6099399eec5fdcedf016fe30b5a to your computer and use it in GitHub Desktop.
// runs as expected with Node v22.1.0, ix@5.0.0, rxjs@7.8.1
// $ npm install ix rxjs
/* global require setTimeout */
// Rx
const {Subject} = require('rxjs');
const {tap: rxTap} = require('rxjs/operators');
// Ix
const {AsyncIterableX: {from: ixFrom}} = require('ix/asynciterable');
const {tap: ixTap} = require('ix/asynciterable/operators');
const randomInt = (min, max) => Math.floor(Math.random() * (max + 1 - min)) + min;
const timer = ms => new Promise(resolve => setTimeout(resolve, ms));
const timedEffect = async (ms, effect) => { await timer(ms); effect(); };
const triangleNumber = n => n * (n + 1) / 2;
const numSends = 10;
const onceAround = triangleNumber(numSends);
const twiceAround = 2 * onceAround;
const minSendWait = 0;
const maxSendWait = minSendWait + 10;
// make minEffectWait greater than combined time needed for all sends so effects
// log only after all sends have logged (easier to visually compare the order)
const minEffectWait = numSends * maxSendWait + 100;
const maxEffectWait = minEffectWait + 100;
let _v = 0;
const asyncNext = async (sub$, v) => {
const wait = randomInt(minSendWait, maxSendWait);
await timer(wait);
console.log(`sending: ${v} (after ${wait} ms)`);
_v += v;
if (_v === onceAround || _v == twiceAround) console.log();
sub$.next(v);
};
let _resolve;
const rxFinished = new Promise(resolve => { _resolve = resolve; });
let __resolve;
const ixFinished = new Promise(resolve => { __resolve = resolve; });
let __v = 0;
const causeEffect = v => {
const wait = randomInt(minEffectWait, maxEffectWait);
// if causeEffect doesn't return a promise ix effect ordering will vary
return timedEffect(
wait,
() => {
console.log(`effect with: ${v} (after ${wait} ms)`);
__v += v;
if (__v === onceAround) _resolve();
if (__v === twiceAround) __resolve();
}
);
};
function nop() { return; }
// Key ideas:
// * Rx consumer does *not* control rate of reaction (producer does) so
// consumer's effect order exhibits non-deterministism
// * Ix consumer *does* control rate of reaction so consumer's effect order is
// totally determined by the consumer
(async () => {
console.log('Rx\n----------------------------------------------------------');
let sub$ = new Subject();
let x = 0;
sub$.pipe(rxTap(causeEffect)).subscribe();
while (x < numSends) {
asyncNext(sub$, ++x);
}
await rxFinished;
console.log('\n^ send and effect order may be different\n');
console.log('Ix\n----------------------------------------------------------');
sub$ = new Subject();
x = 0;
// .forEach() inits consumer's pulling; pipeline asynchronously awaits data
ixFrom(sub$).pipe(ixTap(causeEffect)).forEach(nop);
while (x < numSends) {
asyncNext(sub$, ++x);
}
await ixFinished;
console.log('\n^ send and effect order will *always* be the same (if `causeEffect` returns a promise)\n');
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment