Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Last active July 18, 2023 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cowboyd/688e795494d7d667c6c39928e9bf286a to your computer and use it in GitHub Desktop.
Save cowboyd/688e795494d7d667c6c39928e9bf286a to your computer and use it in GitHub Desktop.
Throttle any Effection stream using a fixed size buffer
/**
* Takes a buffer limit, and returns a stream combinator that converts a stream into stream
* limits the number of in flight items to that limit when it is subscribed to. Use with pipe:
* ```ts
* let doubleclicks = pipe(events, buffer(200), filter(isDoubleClick));
* ```
* Or as a standalone:
* let buffered = buffer(200)(events);
*
* No buffer is actually allocated until the resulting stream is subscribed to.
*/
export function buffer(limit: number): <T,TClose>(stream: Stream<T,TClose>) => Stream<T,TClose> {
return (source) => function* () {
let buffer: T[] = [];
let { port, stream } = createChannel<T, TClose>();
yield* spawn(function*() {
let subscription = yield* source;
while (true) {
let next = yield* subscription.next();
if (next.done) {
yield* port.close(next.value);
break;
} else if (buffer.length < limit) {
buffer.unshift(next.value);
yield* port.send(buffer.pop() as T);
} else {
// buffer overflow. What to do?
}
}
});
return yield* stream;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment