Skip to content

Instantly share code, notes, and snippets.

@titouancreach
Last active July 19, 2024 09:19
Show Gist options
  • Save titouancreach/5532b2d3a0020541cbe3206bfce4e837 to your computer and use it in GitHub Desktop.
Save titouancreach/5532b2d3a0020541cbe3206bfce4e837 to your computer and use it in GitHub Desktop.
port { Chunk, Console, Context, Effect, Layer, Stream } from "effect";
import amqp from "amqplib";
import { NodeRuntime } from "@effect/platform-node";
const make = Effect.gen(function* () {
const connection = yield* Effect.tryPromise(() =>
amqp.connect(
"amqps://***",
),
);
const sendMessage = (queueName: string, message: string) =>
Effect.gen(function* () {
const chan = yield* Effect.promise(() => connection.createChannel());
yield* Effect.promise(() => chan.assertQueue(queueName));
chan.sendToQueue(queueName, Buffer.from(message));
});
const getMessageStream = (
queueName: string,
): Stream.Stream<amqp.Message, never, never> =>
Stream.asyncScoped((emit) =>
Effect.acquireRelease(
Effect.gen(function* () {
const chan = yield* Effect.promise(() => connection.createChannel());
yield* Effect.promise(() => chan.assertQueue(queueName));
chan.consume(queueName, (msg) => {
if (msg !== null) {
emit.chunk(Chunk.of(msg));
}
});
return chan;
}),
(chan) => Effect.promise(() => chan.close()),
),
);
return { getMessageStream, sendMessage };
});
// biome-ignore lint/complexity/noStaticOnlyClass: <explanation>
export class RabbitService extends Context.Tag("RabbitService")<
RabbitService,
Effect.Effect.Success<typeof make>
>() {
static Live = Layer.effect(this, make);
}
const program = Effect.gen(function* () {
const service = yield* RabbitService;
yield* service.sendMessage("my-queue", "Hello, World!");
yield* Stream.runForEach(service.getMessageStream("my-queue"), (msg) =>
Effect.gen(function* () {
yield* Console.log(msg.content.toString());
}),
);
}).pipe(Effect.provide(RabbitService.Live));
NodeRuntime.runMain(program);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment