Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Created April 19, 2023 19:39
Show Gist options
  • Save cowboyd/eb9de6cdca87f55f6103f19c5ab7e1c2 to your computer and use it in GitHub Desktop.
Save cowboyd/eb9de6cdca87f55f6103f19c5ab7e1c2 to your computer and use it in GitHub Desktop.
Hypothetical composable stream operations
import { spawn, pipe, type Operation } from "./mod.ts";
export type Stream<T, TClose> = Operation<Subscription<T, TClose>>;
export interface Subscription<T, TClose> {
next(): Operation<IteratorResult<T, TClose>>;
}
export interface Buffer<T, TClose> extends Subscription<T, TClose> {
contents: Iterable<T>;
}
export function forEach<T,TClose>(op: (item: T) => Operation<void>): (stream: Stream<T, TClose>) => Stream<T, TClose> {
return (stream) => ({
*[Symbol.iterator]() {
let subscription = yield* stream;
return {
*next() {
let next = yield* subscription.next();
if (!next.done) {
yield* op(next.value);
}
return next;
}
}
}
});
}
export function map<T, X, TClose>(op: (item: T) => Operation<X>): (stream: Stream<T, TClose>) => Stream<X, TClose> {
return stream => ({
*[Symbol.iterator]() {
let subscription = yield* stream;
return {
*next() {
let next = yield* subscription.next()
if (!next.done) {
return {...next, value: yield* op(next.value) }
} else {
return next;
}
}
}
}
})
}
export function filter<T, TClose>(op: (item: T) => Operation<boolean>): (stream: Stream<T, TClose>) => Stream<T, TClose> {
return stream => ({
*[Symbol.iterator]() {
let subscription = yield* stream;
return {
*next() {
while (true) {
let next = yield* subscription.next()
if (next.done) {
return next;
} else if (yield* op(next.value)) {
return next;
}
}
}
}
}
})
}
export interface BufferOptions {
size: number;
drop: 'newest' | 'oldest'
}
export function buffer<T, TClose>(options: BufferOptions): (stream: Stream<T, TClose>) => Buffer<T, TClose> {
return stream => ({
*[Symbol.iterator]() {
let { push, memory } = createRingBuffer(options);
yield* spawn(function*() {
let result = yield* pipe(stream, forEach(function*(item) {
push(item);
}));
memory.close(result);
});
return memory;
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment