Skip to content

Instantly share code, notes, and snippets.

@titouancreach
Created May 21, 2024 17:46
Show Gist options
  • Save titouancreach/bfd6b74cec913ac97079bdd0c655bd24 to your computer and use it in GitHub Desktop.
Save titouancreach/bfd6b74cec913ac97079bdd0c655bd24 to your computer and use it in GitHub Desktop.
import { NodeRuntime } from "@effect/platform-node";
import { Schema } from "@effect/schema";
import { SqsLive, SqsService } from "@repo/shared/src/Sqs";
import { Chunk, Console, Effect, Option, Stream, pipe } from "effect";
import { logError, logInfo } from "effect/Effect";
import { Consumer } from "sqs-consumer";
function makeStream<T, U>({
queueName,
schema,
}: { queueName: string; schema: Schema.Schema<T, U> }) {
return Effect.gen(function* () {
const sqsService = yield* SqsService;
const url = yield* sqsService.createQueue({ queueName });
const stream = Stream.async<T>((emit) => {
const startConsumerEffect = Effect.gen(function* () {
const consumer = Consumer.create({
queueUrl: url,
handleMessage: async (message) => {
const sendMessageToStreamEffect = Effect.gen(function* () {
const chunk = yield* pipe(
message.Body,
Option.fromNullable,
Option.map(JSON.parse), // probably better
Effect.flatMap(Schema.decodeUnknown(schema)),
);
yield* logInfo(`[${queueName}] ===> `, chunk);
emit(Effect.succeed(Chunk.of(chunk)));
});
// let it throw here. sqs-consumer will handle it and push the message back to the queue
await Effect.runPromise(
sendMessageToStreamEffect
.pipe(
Effect.tapErrorTag("ParseError", (err) =>
// Since ParseError.message is already well formated, don't break the formating with logError.
Console.error(err.message),
),
)
.pipe(Effect.tapErrorTag("NoSuchElementException", logError)),
);
},
});
yield* logInfo("Starting consumer for queue: ", queueName);
consumer.start();
});
Effect.runSync(startConsumerEffect);
});
return stream;
}).pipe(Effect.provide(SqsLive));
}
const effect = makeStream({
queueName: "MyLazyQueueName",
schema: Schema.Struct({
numeroDeTel: Schema.String,
}),
});
const program = effect.pipe(
Effect.andThen((stream) =>
stream.pipe(
Stream.runForEach((chunk) =>
Console.log("received message from the stream", chunk.numeroDeTel),
),
),
),
);
NodeRuntime.runMain(pipe(Effect.all([program]), Effect.andThen(Effect.never)));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment