Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Last active August 10, 2023 19:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cowboyd/5133c46db9ef39d67b4678faa1f3bacc to your computer and use it in GitHub Desktop.
Save cowboyd/5133c46db9ef39d67b4678faa1f3bacc to your computer and use it in GitHub Desktop.
implement a stream zip using Effection
import {
all,
createChannel,
type Operation,
resource,
spawn,
type Stream,
} from "effection";
export function zip<T>(streams: Stream<T, never>[]): Stream<T[], never> {
return createStream(function* (publish) {
let subscriptions = [];
for (let stream of streams) {
subscriptions.push(yield* stream);
}
while (true) {
let result = yield* all(subscriptions.map((sub) => sub.next()));
yield* publish(result.map((result) => result.value));
}
});
}
/**
* takes a "publisher" which is an operation that that takes a "publish" operation
* as its argument.
*
* ```javascript
* export function toSlowStream(array) {
* return createStream(function*(publish) {
* let start = new Date();
* for (let item of arry) {
* yield* sleep(100);
* yield* publish(item);
* }
* let end = new Date();
* let totalTime = end - start;
* return totalTime;
* });
*/
function createStream<T, TClose>(
publisher: (publish: (value: T) => Operation<void>) => Operation<TClose>,
): Stream<T, TClose> {
return resource(function* (provide) {
let { input, output } = createChannel<T, TClose>();
yield* spawn(function* () {
let result = yield* publisher(input.send);
yield* input.close(result);
});
yield* provide(yield* output);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment