Created
August 20, 2023 22:13
-
-
Save yharaskrik/522cca342ee32d2bf57e8a79c7e7c414 to your computer and use it in GitHub Desktop.
Bull Graceful Shutdown NestJS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { BullModule } from '@nestjs/bull'; | |
import { Module } from '@nestjs/common'; | |
import { QueueGracefulShutdown } from './queue-graceful-shutdown'; | |
const QUEUE_NAME = 'my-queue'; | |
@Module({ | |
imports: [ | |
BullModule.registerQueue({ | |
name: QUEUE_NAME, | |
}), | |
], | |
exports: [BullModule], | |
}) | |
export class MyQueueModule extends QueueGracefulShutdown(QUEUE_NAME) {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { InjectQueue } from '@nestjs/bull'; | |
import type { OnModuleDestroy } from '@nestjs/common'; | |
import { Logger } from '@nestjs/common'; | |
import type { Queue } from 'bull'; | |
// eslint-disable-next-line @typescript-eslint/naming-convention | |
export function QueueGracefulShutdown(queueName: string) { | |
class AbstractQueueShutdown implements OnModuleDestroy { | |
readonly logger: Logger; | |
destroying = false; | |
constructor(@InjectQueue(queueName) readonly queue: Queue) { | |
this.logger = new Logger(`Shutdown: ${this.queue.name}`); | |
} | |
async onModuleDestroy(): Promise<void> { | |
if (this.destroying) { | |
return; | |
} | |
this.destroying = true; | |
const isPaused = await this.queue.isPaused(true); | |
if (!isPaused) { | |
// Only pause this server from processing, not the entire queue across all servers. | |
// This will pause the queue from taking new jobs but allow active ones to finish | |
this.logger.log('Pausing for shutdown.'); | |
await this.queue.pause(true); | |
this.logger.log('Paused'); | |
} else { | |
this.logger.log('Already paused'); | |
} | |
// Once all jobs are finished then we can allow the server to shut down. | |
this.logger.log('Waiting for this workers job to finish.'); | |
await this.queue.whenCurrentJobsFinished(); | |
this.logger.log('All jobs for this worker have finished.', this.queue.name); | |
} | |
} | |
return AbstractQueueShutdown; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment