Created
April 19, 2023 19:39
-
-
Save cowboyd/eb9de6cdca87f55f6103f19c5ab7e1c2 to your computer and use it in GitHub Desktop.
Hypothetical composable stream operations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { spawn, pipe, type Operation } from "./mod.ts"; | |
export type Stream<T, TClose> = Operation<Subscription<T, TClose>>; | |
export interface Subscription<T, TClose> { | |
next(): Operation<IteratorResult<T, TClose>>; | |
} | |
export interface Buffer<T, TClose> extends Subscription<T, TClose> { | |
contents: Iterable<T>; | |
} | |
export function forEach<T,TClose>(op: (item: T) => Operation<void>): (stream: Stream<T, TClose>) => Stream<T, TClose> { | |
return (stream) => ({ | |
*[Symbol.iterator]() { | |
let subscription = yield* stream; | |
return { | |
*next() { | |
let next = yield* subscription.next(); | |
if (!next.done) { | |
yield* op(next.value); | |
} | |
return next; | |
} | |
} | |
} | |
}); | |
} | |
export function map<T, X, TClose>(op: (item: T) => Operation<X>): (stream: Stream<T, TClose>) => Stream<X, TClose> { | |
return stream => ({ | |
*[Symbol.iterator]() { | |
let subscription = yield* stream; | |
return { | |
*next() { | |
let next = yield* subscription.next() | |
if (!next.done) { | |
return {...next, value: yield* op(next.value) } | |
} else { | |
return next; | |
} | |
} | |
} | |
} | |
}) | |
} | |
export function filter<T, TClose>(op: (item: T) => Operation<boolean>): (stream: Stream<T, TClose>) => Stream<T, TClose> { | |
return stream => ({ | |
*[Symbol.iterator]() { | |
let subscription = yield* stream; | |
return { | |
*next() { | |
while (true) { | |
let next = yield* subscription.next() | |
if (next.done) { | |
return next; | |
} else if (yield* op(next.value)) { | |
return next; | |
} | |
} | |
} | |
} | |
} | |
}) | |
} | |
export interface BufferOptions { | |
size: number; | |
drop: 'newest' | 'oldest' | |
} | |
export function buffer<T, TClose>(options: BufferOptions): (stream: Stream<T, TClose>) => Buffer<T, TClose> { | |
return stream => ({ | |
*[Symbol.iterator]() { | |
let { push, memory } = createRingBuffer(options); | |
yield* spawn(function*() { | |
let result = yield* pipe(stream, forEach(function*(item) { | |
push(item); | |
})); | |
memory.close(result); | |
}); | |
return memory; | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment