Skip to content

Instantly share code, notes, and snippets.

@crdrost
Created January 12, 2018 13:12
Show Gist options
  • Save crdrost/2edb930143630866eedae871f330099e to your computer and use it in GitHub Desktop.
Save crdrost/2edb930143630866eedae871f330099e to your computer and use it in GitHub Desktop.
Bidirectional streams inspired by @andrestaltz
//json interpolation for string subs. Just used in shape().
const in_json = (strs, ...vals) =>
strs[0] + vals.map((v, i) => JSON.stringify(v) + strs[i + 1]).join("");
// validate shape of an object having appropriate methods.
// PLEASE do this, using if() makes this sort of "that fn doesn't handle this particular event"
// unassertable, and that means that you have whole undebuggable pipelines where "somewhere in
// this pipeline that information gets lost, I don't know where!"
const shape = (obj, ...method_names) => {
const missing_methods = method_names.some(m => typeof obj[m] !== "function");
if (missing_methods) {
throw new Error(
in_json`shape mismatch, expected methods ${method_names}, got methods ${Object.keys(
obj
).filter(k => typeof obj[k] === "function")}`
);
}
};
const mapNext = (source, transform) => {
return {
subscribe(sink) {
const sinkM = {};
Object.keys(sink).forEach(k => (sinkM[k] = val => sink[k](val)));
sinkM.next = data => {
sink.next(transform(data));
};
source.subscribe(sinkM);
}
};
};
const interval = {
subscribe(other) {
shape(other, "subscribe", "next");
const handle = setInterval(() => other.next(i++), 1000);
let i = 0;
other.subscribe({
next() {
i = 0;
},
dispose() {
clearInterval(handle);
}
});
// you were also returning dispose() above, but that seems nonidiomatic for this
// sort of code where subscribe() is apparently only important for its side-effects.
}
};
const logger = {
subscribe(other) {
shape(other, "next");
setTimeout(() => other.next(), 4500);
},
next(value) {
console.log(value);
}
};
interval.subscribe(logger);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment