Skip to content

Instantly share code, notes, and snippets.

@schickling
Created December 30, 2022 21:02
Show Gist options
  • Save schickling/399ca78de10aa604c74f37d29aaa7c42 to your computer and use it in GitHub Desktop.
Save schickling/399ca78de10aa604c74f37d29aaa7c42 to your computer and use it in GitHub Desktop.
bufferUpTp.ts
import { Cause, Chunk, Effect as T, Exit as Ex, Managed as M, Option as O, pipe, Queue as Q } from '@effect-ts/core'
import * as S from '@effect-ts/core/Effect/Experimental/Stream'
import * as CH from '@effect-ts/core/Effect/Experimental/Stream/Channel'
export const bufferUpTo =
(n: number) =>
<R, E, A>(stream: S.Stream<R, E, A>) => {
const queue = toQueueOfElements_(stream, n)
return new S.Stream(
CH.managed_(queue, (queue) => {
const process: CH.Channel<unknown, unknown, unknown, unknown, E, Chunk.Chunk<A>, void> = CH.chain_(
CH.fromEffect(pipe(Q.takeBetween_(queue, 1, n), T.chain(T.forEach(T.done)), T.result)),
Ex.fold(
(_) =>
O.fold_(
Cause.flipCauseOption(_),
() => CH.end(void 0),
(_) => CH.failCause(_),
),
(values) => CH.zipRight_(CH.write(Chunk.from(values)), process),
),
)
return process
}),
)
}
const runIntoElementsManaged_ = <R, R1, E, A>(
self: S.Stream<R, E, A>,
queue: Q.XQueue<R1, never, never, unknown, Ex.Exit<O.Option<E>, A>, any>,
): M.Managed<R & R1, E, void> => {
const writer = (): CH.Channel<R1, E, Chunk.Chunk<A>, unknown, never, Chunk.Chunk<Ex.Exit<O.Option<E>, A>>, any> =>
CH.readWith(
(in_) => CH.zipRight_(CH.write(Chunk.map_(in_, Ex.succeed)), writer()),
(err) => CH.write(Chunk.single(Ex.fail(O.some(err)))),
(_) => CH.write(Chunk.single(Ex.fail(O.none))),
)
return pipe(
self.channel['>>>'](writer()),
CH.mapOutEffect((_) => Q.offerAll_(queue, _)),
CH.drain,
CH.runManaged,
M.asUnit,
)
}
const toQueueOfElements_ = <R, E, A>(
self: S.Stream<R, E, A>,
capacity = 2,
): M.RIO<R, Q.Queue<Ex.Exit<O.Option<E>, A>>> => {
return pipe(
M.do,
M.bind('queue', () => T.toManagedRelease_(Q.makeBounded<Ex.Exit<O.Option<E>, A>>(capacity), Q.shutdown)),
M.tap(({ queue }) => M.fork(runIntoElementsManaged_(self, queue))),
M.map(({ queue }) => queue),
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment