Skip to content

Instantly share code, notes, and snippets.

@r-cyr
Last active December 11, 2021 04:35
Show Gist options
  • Save r-cyr/59447421d69e66ecd7988f611eddd2a5 to your computer and use it in GitHub Desktop.
Save r-cyr/59447421d69e66ecd7988f611eddd2a5 to your computer and use it in GitHub Desktop.
import * as T from "@effect-ts/core/Effect";
import * as E from "@effect-ts/core/Either";
import * as M from "@effect-ts/core/Effect/Managed";
import * as CS from "@effect-ts/core/Effect/Cause";
import * as CK from "@effect-ts/core/Collections/Immutable/Chunk";
import * as S from "@effect-ts/core/Effect/Experimental/Stream";
import * as CH from "@effect-ts/core/Effect/Experimental/Stream/Channel";
import { pipe } from "@effect-ts/core/Function";
import { runMain } from "@effect-ts/node/Runtime";
import * as Byte from "@effect-ts/node/Byte";
import { Transform, TransformCallback } from "stream";
class PlusTwoTransform extends Transform {
_transform(
chunk: Buffer,
encoding: BufferEncoding,
callback: TransformCallback
): void {
this.push(chunk.map((x) => x + 2));
callback();
}
}
class TransformError extends Error {
readonly _tag = "TransformError";
constructor(message: string, readonly options: { cause?: unknown } = {}) {
super(message);
this.name = this._tag;
}
}
class CheckedError extends Error {
readonly _tag = "CheckedError";
constructor(readonly error: Error) {
super("A checked error has occured");
this.name = this._tag;
}
}
class UncheckedError extends Error {
readonly _tag = "UncheckedError";
constructor(readonly cause: CS.Cause<any>) {
super("An unchecked error has occured");
this.name = this._tag;
}
}
function transform(tr: () => Transform) {
return <R, E>(self: S.Stream<R, E, Byte.Byte>) =>
S.unwrapManaged(
M.gen(function* (_) {
const input = yield* _(
T.toManagedRelease_(T.succeedWith(tr), (tr) =>
T.succeedWith(() => {
tr.destroy();
})
)
);
const reader: CH.Channel<
unknown,
E,
CK.Chunk<Byte.Byte>,
unknown,
E,
never,
any
> = CH.readWithCause(
(in_) =>
CH.zipRight_(
CH.fromEffect(
pipe(
T.effectAsync<unknown, TransformError, void>((cb) => {
input.write(Byte.buffer(in_), "binary", (err) => {
if (err) {
cb(
T.fail(
new TransformError(
"Failure while writing to the transform stream",
{ cause: err }
)
)
);
} else {
cb(T.succeed(void 0));
}
});
}),
T.catchAllCause((cause) =>
T.succeedWith(() => {
E.fold_(
CS.failureOrCause(cause),
(failure) => {
input.destroy(
new CheckedError(failure as unknown as Error)
);
},
(cause) => {
input.destroy(new UncheckedError(cause));
}
);
})
)
)
),
reader
),
(cause) =>
CH.zipRight_(
CH.fromEffect(
T.succeedWith(() =>
E.fold_(
CS.failureOrCause(cause),
(failure) => {
input.destroy(
new CheckedError(failure as unknown as Error)
);
},
(cause) => {
input.destroy(new UncheckedError(cause));
}
)
)
),
CH.end(void 0)
),
(end) =>
CH.zipRight_(
CH.fromEffect(
T.succeedWith(() => {
input.end();
})
),
CH.end(end)
)
);
const writer = S.async<unknown, E | TransformError, Byte.Byte>(
(emit) => {
input
.on("end", () => {
emit.end();
})
.on("error", (err) => {
if (err instanceof CheckedError) {
emit.fail(err as unknown as E | TransformError);
} else if (err instanceof UncheckedError) {
emit.halt(err.cause);
} else {
emit.die(err);
}
})
.on("data", (buffer) => {
emit.chunk(Byte.chunk(buffer));
});
}
);
return S.crossRight_(
S.managed(M.fork(CH.runManaged(self.channel[">>>"](reader)))),
writer
);
})
);
}
pipe(
S.range(1, 25) as S.UIO<Byte.Byte>,
transform(() => new PlusTwoTransform()),
S.runCollect,
T.chain((result) =>
T.succeedWith(() => {
console.log([...result]);
})
),
runMain
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment