Skip to content

Instantly share code, notes, and snippets.

@m5r
Created February 26, 2022 20:22
Show Gist options
  • Star 17 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save m5r/b2f1f0d044bba435d58aab67e82cf79b to your computer and use it in GitHub Desktop.
Save m5r/b2f1f0d044bba435d58aab67e82cf79b to your computer and use it in GitHub Desktop.
bullmq job queue in Remix
import notifierQueue from "~/queues/notifier.server.ts";
export const loader = async () => {
await notifierQueue.add("test", { emailAddress: "mokhtar@remixtape.dev" });
return null;
};
import { Queue } from "~/utils/queue.server";
type QueueData = {
emailAddress: string;
};
export const queue = Queue<QueueData>("notifier", async (job) => {
console.log(`Sending email to ${job.data.emailAddress}`);
// Delay 1 second to simulate sending an email, be it for user registration, a newsletter, etc.
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log(`Email sent to ${job.data.emailAddress}`);
});
import {
type Processor,
Queue as BullQueue,
Worker,
QueueScheduler,
} from "bullmq";
import redis from "./redis.server";
type RegisteredQueue = {
queue: BullQueue;
worker: Worker;
scheduler: QueueScheduler;
};
declare global {
var __registeredQueues: Record<string, RegisteredQueue> | undefined;
}
const registeredQueues = global.__registeredQueues || (global.__registeredQueues = {});
export function Queue<Payload>(name: string, handler: Processor<Payload>): BullQueue<Payload> {
if (registeredQueues[name]) {
return registeredQueues[name].queue;
}
const queue = new BullQueue<Payload>(name, { connection: redis });
const worker = new Worker<Payload>(name, handler, { connection: redis });
const scheduler = new QueueScheduler(name, { connection: redis });
registeredQueues[name] = { queue, scheduler, worker };
return queue;
}
import Redis, { type Redis as RedisType, type RedisOptions } from "ioredis";
let redis: RedisType;
declare global {
var __redis: RedisType | undefined;
}
const redisOptions: RedisOptions = {
maxRetriesPerRequest: null,
enableReadyCheck: false,
};
// this is needed because in development we don't want to restart
// the server with every change, but we want to make sure we don't
// create a new connection to the Redis with every change either.
if (process.env.NODE_ENV === "production") {
redis = new Redis(process.env.REDIS_URL, redisOptions);
} else {
if (!global.__redis) {
global.__redis = new Redis(process.env.REDIS_URL, redisOptions);
}
redis = global.__redis;
}
export default redis;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment