Skip to content

Instantly share code, notes, and snippets.

@barelyhuman
Created April 16, 2022 23:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save barelyhuman/94331d65c1bb3b565148c474d3b816f9 to your computer and use it in GitHub Desktop.
Save barelyhuman/94331d65c1bb3b565148c474d3b816f9 to your computer and use it in GitHub Desktop.
wonka (wonka.kitten.sh) , custom operators code
import { fromArray, fromValue, pipe, toPromise } from "wonka";
function talkbackPlaceholder(a) {}
const customTake = (max) => (source) => (sink) => {
let taken = [];
let talkback = talkbackPlaceholder;
let ended = false;
return source((signal) => {
if (typeof signal === "number") {
// stream has ended, handle it
fromValue(taken)(sink);
} else {
if (signal.tag) {
// data is available and can be transformed / triggered/ read/ etc
if (taken.length < max) {
taken.push(signal[0]);
talkback(0);
if (taken.length === max && !ended) {
// we're done with reading, stop here
talkback(1);
fromValue(taken)(sink);
}
}
} else {
console.log("assign talkback");
// talkback is available and should be assigned
talkback = signal[0];
}
// either way, trigger a next
talkback(0);
}
});
};
function sum(source) {
return function (sink) {
let total = 0;
let talkback = talkbackPlaceholder;
return source(function (signal) {
if (typeof signal == "number") {
fromValue(total)(sink);
return;
}
if (signal.tag) {
total += parseInt(signal[0], 10);
talkback(0);
} else {
talkback = signal[0];
}
talkback(0);
});
};
}
async function main() {
const result = await pipe(fromArray([1, 2, 3, 4]), sum, toPromise);
console.log({ result });
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment