Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Created November 3, 2023 15:31
Show Gist options
  • Save cowboyd/db8498791c5238cba101675fc68dc936 to your computer and use it in GitHub Desktop.
Save cowboyd/db8498791c5238cba101675fc68dc936 to your computer and use it in GitHub Desktop.
A combinator to buffer any stream
import { type Stream, createQueue, spawn } from "effection";
/*/
* @example
* let clicks: Stream<MousEvent> = pipe(on(button, 'click'), buffer(100));
*/
export function buffer<T, TClose>(limit: number): (stream: Stream<T, TClose>) => Stream<T, TClose> {
return function(stream) {
return {
*subscribe() {
// the buffer will receive all messages from the source
// it will drop overflow after limit is reached
// by default it will drop the last message received
let buffer = createQueue<T, TClose>({
limit,
drop: "latest",
});
// spawn an operation to consume the source and "pump" it
// into the buffer. If too many items are received,
// they will overflow, which is what we want.
yield* spawn(function*() {
let subscription = yield* stream.subscribe();
let next = yield* subscription.next();
while (!next.done) {
buffer.add(next.value);
next = yield* subscription.next();
}
buffer.close(next.value);
});
// We now delegate to the buffer for our own subscription
return {
*next() {
return yield* buffer.next();
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment