Created
November 3, 2023 15:31
-
-
Save cowboyd/db8498791c5238cba101675fc68dc936 to your computer and use it in GitHub Desktop.
A combinator to buffer any stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { type Stream, createQueue, spawn } from "effection"; | |
/*/ | |
* @example | |
* let clicks: Stream<MousEvent> = pipe(on(button, 'click'), buffer(100)); | |
*/ | |
export function buffer<T, TClose>(limit: number): (stream: Stream<T, TClose>) => Stream<T, TClose> { | |
return function(stream) { | |
return { | |
*subscribe() { | |
// the buffer will receive all messages from the source | |
// it will drop overflow after limit is reached | |
// by default it will drop the last message received | |
let buffer = createQueue<T, TClose>({ | |
limit, | |
drop: "latest", | |
}); | |
// spawn an operation to consume the source and "pump" it | |
// into the buffer. If too many items are received, | |
// they will overflow, which is what we want. | |
yield* spawn(function*() { | |
let subscription = yield* stream.subscribe(); | |
let next = yield* subscription.next(); | |
while (!next.done) { | |
buffer.add(next.value); | |
next = yield* subscription.next(); | |
} | |
buffer.close(next.value); | |
}); | |
// We now delegate to the buffer for our own subscription | |
return { | |
*next() { | |
return yield* buffer.next(); | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment