Skip to content

Instantly share code, notes, and snippets.

@rogeriochaves
Created March 11, 2022 17:10
Show Gist options
  • Save rogeriochaves/57e53dd7fa8c0caf9e3306dbe8a32c7c to your computer and use it in GitHub Desktop.
Save rogeriochaves/57e53dd7fa8c0caf9e3306dbe8a32c7c to your computer and use it in GitHub Desktop.
const { merge, interval, from, Observable, Subject, zip } = Rx;
const { delay, partition, tap, take, bufferTime, bufferWhen, map, bufferCount, mergeMap, concatAll, windowCount, buffer, mergeAll, filter } = RxOperators;
const topic$ = new Subject();
const retryTopics = [1,2,3,4,5,6,7,8,9,10].map(i =>
delay(1000 * 2 ** i)(new Subject())
);
let i = 0;
setInterval(() => {
if (i < 25) {
topic$.next({action: "insert", item: i++})
topic$.next({action: "insert", item: i++})
topic$.next({action: "insert", item: i++})
topic$.next({action: "insert", item: i++})
console.log("last i", i)
}
}, 1000);
setTimeout(() => {
topic$.next({action: "delete", item: 2}),
topic$.next({action: "delete", item: 3})
}, 2500);
const done$ = new Subject();
const process = (topic$, retryNum) => {
let [inserts$, deletes$] = partition(x => x.action == "insert")(topic$)
const bufferCountOrTime = (count, bufferTimeSpan=1500) => (stream$) =>
stream$.pipe(
map(x => x.item),
windowCount(count),
mergeMap(x => bufferTime(bufferTimeSpan)(x)),
filter(x => x.length > 0)
)
inserts$ = bufferCountOrTime(3)(inserts$)
deletes$ = bufferCountOrTime(3)(deletes$)
inserts$.subscribe(items => {
if (Math.random() < 0.5) {
done$.next(`inserted [${items}], retry num ${retryNum}`);
} else {
items.forEach(item => {
retryTopics[retryNum + 1].next({
action: "insert", item: item
});
});
}
});
deletes$.subscribe(items => {
done$.next(`deleted [${items}], retry num ${retryNum}`);
});
}
[topic$, ...retryTopics].forEach((topic$, index) => {
process(topic$, index)
});
done$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment