Skip to content

Instantly share code, notes, and snippets.

@titouancreach
Created May 22, 2024 22:09
Show Gist options
  • Save titouancreach/0bbb117dfffc09c711b25c8c4e485598 to your computer and use it in GitHub Desktop.
Save titouancreach/0bbb117dfffc09c711b25c8c4e485598 to your computer and use it in GitHub Desktop.
import {
DeleteMessageCommand,
ReceiveMessageCommand,
} from "@aws-sdk/client-sqs";
import { Schema } from "@effect/schema";
import { SqsLive, SqsService, getSqslient } from "@repo/shared/src/Sqs";
import { Array, Effect, Option, Schedule, pipe } from "effect";
import { logDebug, logError, logWarning } from "effect/Effect";
function takeFromSqs({ queueUrl }: { queueUrl: string }) {
const takeEffect = Effect.gen(function* () {
const client = yield* getSqslient();
yield* logDebug("start receive message...");
const receiveMessgeResult = yield* Effect.promise(() =>
client.send(
new ReceiveMessageCommand({
WaitTimeSeconds: 5,
MaxNumberOfMessages: 1,
QueueUrl: queueUrl,
}),
),
);
return yield* pipe(
receiveMessgeResult.Messages,
Option.fromNullable,
Option.flatMap(Array.head),
);
});
return Effect.retry(takeEffect, Schedule.forever);
}
function initQueue({ queueName }: { queueName: string }) {
return Effect.gen(function* () {
const client = yield* SqsService;
yield* logDebug("Queue created with name: ", queueName);
return yield* client.createQueue({ queueName });
}).pipe(Effect.provide(SqsLive));
}
function deleteMessage({
messageId,
queueUrl,
}: { messageId: string; queueUrl: string }) {
return Effect.gen(function* () {
const client = yield* getSqslient();
const deletion = yield* Effect.promise(() =>
client.send(
new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: messageId,
}),
),
);
yield* logDebug("Message deleted");
return deletion;
});
}
function pollAndExecuteInfinite<T, U, Err>({
queueUrl,
processFunc,
schema,
}: {
queueUrl: string;
processFunc: (message: T) => Effect.Effect<void, Err, never>;
schema: Schema.Schema<T, U>;
}) {
return Effect.gen(function* () {
const message = yield* takeFromSqs({ queueUrl: queueUrl });
const decode = Schema.decodeUnknown(Schema.parseJson(schema));
const decodedBody = yield* decode(message.Body).pipe(
Effect.tapError(logWarning),
);
yield* logDebug("message received: ", message);
yield* processFunc(decodedBody)
.pipe(
Effect.andThen(() =>
Effect.gen(function* () {
if (message.ReceiptHandle) {
yield* deleteMessage({
messageId: message.ReceiptHandle,
queueUrl: queueUrl,
});
}
}),
),
)
.pipe(Effect.tapError(logError));
})
.pipe(Effect.retry(Schedule.forever))
.pipe(Effect.forever);
}
export function startConsumer<T, U, Err>({
queueName,
processFunc,
schema,
}: {
queueName: string;
processFunc: (message: T) => Effect.Effect<void, Err, never>;
schema: Schema.Schema<T, U>;
}) {
return Effect.gen(function* () {
const queueUrl = yield* initQueue({ queueName });
yield* pollAndExecuteInfinite({ queueUrl, processFunc, schema });
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment