Skip to content

Instantly share code, notes, and snippets.

@wodCZ
Created January 21, 2024 16:49
Show Gist options
  • Save wodCZ/5cf2537b849fe1c07c7f478feb5b7505 to your computer and use it in GitHub Desktop.
Save wodCZ/5cf2537b849fe1c07c7f478feb5b7505 to your computer and use it in GitHub Desktop.
Nest separate Queue and Processor structure
import { Module } from '@nestjs/common';
import { BullModule, BullRootModuleOptions } from '@nestjs/bull';
import { QueuesService } from './queues.service';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AppConfig } from '../config/schema';
@Module({
imports: [
BullModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory(configService: AppConfig) {
const redisUrl = configService.get('REDIS_URL', { infer: true });
return {
url: redisUrl,
redis: redisUrl.startsWith('rediss://')
? { tls: { requestCert: true } }
: undefined,
} as BullRootModuleOptions;
},
}),
...QueuesService.QUEUES.map((queue) =>
BullModule.registerQueue({
name: queue.queueName,
defaultJobOptions: queue.defaultJobOptions,
}),
),
],
exports: [QueuesService],
providers: [QueuesService],
})
export class QueuesModule {}
import {Injectable} from '@nestjs/common';
import {JobOptions, Queue} from 'bull';
import {InjectQueue} from '@nestjs/bull';
import {StorageQueue} from '../worker/storage.queue';
@Injectable()
export class QueuesService {
static QUEUES: {
queueName: string;
defaultJobOptions?: JobOptions;
}[] = [
StorageQueue,
];
constructor(
@InjectQueue(StorageQueue.queueName)
private readonly storageQueue: Queue,
) {
}
public get storage() {
return StorageQueue.makeFactories(this.storageQueue);
}
}
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { Media } from '@prisma/client';
import { StorageService } from '../storage/storage.service';
import { StorageQueue } from './storage.queue';
@Processor(StorageQueue.queueName)
export class StorageProcessor {
constructor(private readonly storageService: StorageService) {}
@Process(StorageQueue.JOB_VERIFY)
async verify(job: Job<Media>) {
const verified = await this.storageService.verify(job.data);
if (!verified) {
throw new Error(`Media ${job.data.fileName} is not yet uploaded.`);
}
return true;
}
}
import { JobOptions, Queue } from 'bull';
import { Media } from '@prisma/client';
export class StorageQueue {
static queueName = 'storage';
static defaultJobOptions: JobOptions = {
removeOnComplete: { age: 86400 * 5 },
};
static JOB_VERIFY = 'verify';
static makeFactories(queue: Queue) {
return {
verify(media: Media) {
return queue.add(StorageQueue.JOB_VERIFY, media, {
attempts: 100,
backoff: {
type: 'fixed',
delay: 10000,
},
});
},
};
}
}
import {Module} from '@nestjs/common';
import {WorkerService} from './worker.service';
import {StorageProcessor} from './storage.processor';
import {ConfigModule} from '@nestjs/config';
import {schema} from '../config/schema';
import {QueuesModule} from '../queues/queues.module';
@Module({
imports: [
ConfigModule.forRoot({
envFilePath: '.env.local',
isGlobal: true,
cache: true,
validationSchema: schema,
expandVariables: true,
}),
QueuesModule,
],
providers: [
WorkerService,
StorageProcessor,
],
})
export class WorkerModule {
}
import {Injectable, Logger, OnModuleDestroy} from '@nestjs/common';
@Injectable()
export class WorkerService implements OnModuleDestroy {
private readonly promise;
private resolve?: (value?: any) => void;
private reject?: (reason?: any) => void;
private server;
constructor() {
this.promise = new Promise((resolve, reject) => {
Logger.log('Waiting for jobs', 'WorkerService');
this.resolve = resolve;
this.reject = reject;
});
}
async work() {
return this.promise;
}
onModuleDestroy() {
Logger.log('Shutting down', 'WorkerService');
this.server.close();
this.resolve && this.resolve();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment