Skip to content

Instantly share code, notes, and snippets.

@Lucifier129
Created July 8, 2022 09:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lucifier129/cf9c6668709ee76649bf05a8280a32a4 to your computer and use it in GitHub Desktop.
Save Lucifier129/cf9c6668709ee76649bf05a8280a32a4 to your computer and use it in GitHub Desktop.
push-stream via codata
function pipe<A, B>(a: A, f: (a: A) => B): B;
function pipe<A, B, C>(a: A, f: (a: A) => B, g: (b: B) => C): C;
function pipe<A, B, C, D>(
a: A,
f: (a: A) => B,
g: (b: B) => C,
h: (c: C) => D
): D;
function pipe<A, B, C, D, E>(
a: A,
f: (a: A) => B,
g: (b: B) => C,
h: (c: C) => D,
i: (d: D) => E
): E;
function pipe<A, B, C, D, E, F>(
a: A,
f: (a: A) => B,
g: (b: B) => C,
h: (c: C) => D,
i: (d: D) => E,
j: (e: E) => F
): F;
function pipe<A, B, C, D, E, F, G>(
a: A,
f: (a: A) => B,
g: (b: B) => C,
h: (c: C) => D,
i: (d: D) => E,
j: (e: E) => F,
k: (f: F) => G
): G;
function pipe<A, B, C, D, E, F, G, H>(
a: A,
f: (a: A) => B,
g: (b: B) => C,
h: (c: C) => D,
i: (d: D) => E,
j: (e: E) => F,
k: (f: F) => G,
l: (g: G) => H
): H;
function pipe<A, B, C, D, E, F, G, H, I>(
a: A,
f: (a: A) => B,
g: (b: B) => C,
h: (c: C) => D,
i: (d: D) => E,
j: (e: E) => F,
k: (f: F) => G,
l: (g: G) => H,
m: (h: H) => I
): I;
function pipe(a: any, ...fns: any[]): any {
return fns.reduce((a, f) => f(a), a);
}
type StreamConsumer<T> = {
next: (value: T) => unknown;
done: () => unknown;
};
type StreamHandler = {
ignite: () => void;
finish: () => void;
};
type Stream<T> = {
subscribe: (consumer: StreamConsumer<T>) => StreamHandler;
};
const map = <T, R>(mapper: (value: T) => R) => {
return (stream: Stream<T>): Stream<R> => {
const subscribe = (consumer: StreamConsumer<R>) => {
return stream.subscribe({
next: (value) => consumer.next(mapper(value)),
done: () => consumer.done(),
});
};
return {
subscribe,
};
};
};
const filter = <T>(predicate: (value: T) => boolean) => {
return (stream: Stream<T>): Stream<T> => {
return {
subscribe: (consumer: StreamConsumer<T>) => {
return stream.subscribe({
next: (value) => {
if (predicate(value)) {
consumer.next(value);
}
},
done: () => consumer.done(),
});
},
};
};
};
const take = <T>(max: number) => {
return (stream: Stream<T>): Stream<T> => {
return {
subscribe: (consumer: StreamConsumer<T>) => {
let counter = 0;
const handler = stream.subscribe({
next: (value) => {
if (counter < max) {
consumer.next(value);
counter++;
}
if (counter >= max) {
consumer.done();
}
},
done: () => consumer.done(),
});
return handler;
},
};
};
};
const reverse = <T>(stream: Stream<T>): Stream<T> => {
return {
subscribe: (consumer: StreamConsumer<T>) => {
let values = [] as T[];
const handler = stream.subscribe({
next: (value) => {
values.push(value);
},
done: () => {
values.reverse().forEach((value) => consumer.next(value));
consumer.done();
},
});
return handler;
},
};
};
const range = (start: number, end: number, step = 1): Stream<number> => {
return {
subscribe: (consumer: StreamConsumer<number>) => {
const ignite = () => {
for (let i = start; i < end; i += step) {
consumer.next(i);
}
consumer.done();
};
return {
ignite: () => ignite(),
finish: () => {},
};
},
};
};
const forEach = <T>(fn: (value: T) => void) => (stream: Stream<T>) => {
const handler = stream.subscribe({
next: (value) => {
fn(value);
},
done: () => {},
});
handler.ignite();
return handler;
};
pipe(
range(0, 20),
map((value) => value * 2),
reverse,
forEach((value) => console.log(value)),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment