Last active
July 18, 2023 08:38
-
-
Save cowboyd/688e795494d7d667c6c39928e9bf286a to your computer and use it in GitHub Desktop.
Throttle any Effection stream using a fixed size buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Takes a buffer limit, and returns a stream combinator that converts a stream into stream | |
* limits the number of in flight items to that limit when it is subscribed to. Use with pipe: | |
* ```ts | |
* let doubleclicks = pipe(events, buffer(200), filter(isDoubleClick)); | |
* ``` | |
* Or as a standalone: | |
* let buffered = buffer(200)(events); | |
* | |
* No buffer is actually allocated until the resulting stream is subscribed to. | |
*/ | |
export function buffer(limit: number): <T,TClose>(stream: Stream<T,TClose>) => Stream<T,TClose> { | |
return (source) => function* () { | |
let buffer: T[] = []; | |
let { port, stream } = createChannel<T, TClose>(); | |
yield* spawn(function*() { | |
let subscription = yield* source; | |
while (true) { | |
let next = yield* subscription.next(); | |
if (next.done) { | |
yield* port.close(next.value); | |
break; | |
} else if (buffer.length < limit) { | |
buffer.unshift(next.value); | |
yield* port.send(buffer.pop() as T); | |
} else { | |
// buffer overflow. What to do? | |
} | |
} | |
}); | |
return yield* stream; | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment