Last active
August 10, 2023 19:25
-
-
Save cowboyd/5133c46db9ef39d67b4678faa1f3bacc to your computer and use it in GitHub Desktop.
implement a stream zip using Effection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
all, | |
createChannel, | |
type Operation, | |
resource, | |
spawn, | |
type Stream, | |
} from "effection"; | |
export function zip<T>(streams: Stream<T, never>[]): Stream<T[], never> { | |
return createStream(function* (publish) { | |
let subscriptions = []; | |
for (let stream of streams) { | |
subscriptions.push(yield* stream); | |
} | |
while (true) { | |
let result = yield* all(subscriptions.map((sub) => sub.next())); | |
yield* publish(result.map((result) => result.value)); | |
} | |
}); | |
} | |
/** | |
* takes a "publisher" which is an operation that that takes a "publish" operation | |
* as its argument. | |
* | |
* ```javascript | |
* export function toSlowStream(array) { | |
* return createStream(function*(publish) { | |
* let start = new Date(); | |
* for (let item of arry) { | |
* yield* sleep(100); | |
* yield* publish(item); | |
* } | |
* let end = new Date(); | |
* let totalTime = end - start; | |
* return totalTime; | |
* }); | |
*/ | |
function createStream<T, TClose>( | |
publisher: (publish: (value: T) => Operation<void>) => Operation<TClose>, | |
): Stream<T, TClose> { | |
return resource(function* (provide) { | |
let { input, output } = createChannel<T, TClose>(); | |
yield* spawn(function* () { | |
let result = yield* publisher(input.send); | |
yield* input.close(result); | |
}); | |
yield* provide(yield* output); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment