Created
July 6, 2022 14:13
-
-
Save sambhav2612/76ff2ec4ab6a8153f8df7b4313b7c2eb to your computer and use it in GitHub Desktop.
nestjs bull queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BullModule.registerQueue({ | |
name: 'applicant', | |
}), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
OnQueueActive, | |
OnQueueCompleted, | |
OnQueueError, | |
OnQueueFailed, | |
Process, | |
Processor, | |
} from '@nestjs/bull'; | |
import { Job } from 'bull'; | |
import { AiService } from '../services/ai.service'; | |
import { Types } from 'mongoose'; | |
import { ApplicantService } from '../modules/applicant/applicant.service'; | |
import { NotificationService } from '../modules/notification/notification.service'; | |
import { QueueService } from '../modules/queue/queue.service'; | |
import { GatewayService } from '../services/gateway.service'; | |
import { Logger } from '@nestjs/common'; | |
import { v4 } from 'uuid'; | |
const { ObjectId } = Types; | |
@Processor('applicant') | |
export class ApplicantProcessor { | |
private readonly logger = new Logger(ApplicantProcessor.name); | |
constructor( | |
private readonly gatewayService: GatewayService, | |
private readonly notificationService: NotificationService, | |
private readonly applicantService: ApplicantService, | |
private readonly aiService: AiService, | |
private readonly queueService: QueueService, | |
) {} | |
@OnQueueError() | |
onError(error: Error) { | |
this.logger.log( | |
`Error Processing Datapool queue ${error.message}: ${JSON.stringify( | |
error.stack, | |
)}`, | |
); | |
} | |
@OnQueueFailed() | |
onFailed(job: Job, error: Error) { | |
this.logger.log( | |
`Failed Processing Datapool id ${job.data.userData.organization}... ${ | |
error.message | |
}: ${JSON.stringify(error.stack)}`, | |
); | |
} | |
@OnQueueActive() | |
onActive(job: Job) { | |
this.logger.log( | |
`Processing Datapool id ${job.data.userData.organization}...`, | |
); | |
} | |
@OnQueueCompleted() | |
onComplete(job: Job) { | |
this.logger.log( | |
`Completed Processing Datapool id ${job.data.userData.organization}...`, | |
); | |
} | |
@Process() | |
async processApplicantData(job: Job<object>) { | |
try { | |
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | |
// @ts-ignore | |
const userData = job.data.userData; | |
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | |
// @ts-ignore | |
const hasPlan = job.data.hasPlan; | |
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | |
// @ts-ignore | |
const totalUploaded = job.data.totalUploaded; | |
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | |
// @ts-ignore | |
const resumesUploaded = job.data.resumesUploaded; | |
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | |
// @ts-ignore | |
const files: Array<string> = job.data.files || []; | |
console.time(String(userData.organization)); | |
const queue = await this.queueService.create({ | |
type: 'DATAPOOL', | |
entityId: userData.organization, | |
startedAt: new Date(), | |
}); | |
let added = 0; | |
this.logger.log( | |
`Datapool resume upload request started at ${new Date().toLocaleString( | |
'en-IN', | |
)}`, | |
); | |
const parsedApplicants = | |
(await this.aiService.getApplicantDataFromResumes(files)) || []; | |
this.logger.log( | |
`Datapool resume upload request completed at ${new Date().toLocaleString( | |
'en-IN', | |
)}`, | |
); | |
for (const applicant of parsedApplicants) { | |
const olderApplication = await this.applicantService.findOlderApplicationForApplicantInDatapool( | |
applicant.email, | |
userData.organization, | |
); | |
if (!olderApplication || !applicant.email || !applicant.phoneNumber) { | |
++added; | |
const applicantDetails = await this.applicantService.createApplicant({ | |
primary: true, | |
name: applicant.name || '', | |
email: applicant.email || '', | |
mobile: applicant.phoneNumber || '', | |
organization: ObjectId(userData.organization), | |
resume: applicant.resumeUrl, | |
resumeText: applicant.resumeText, | |
status: 'NO_ACTION_TAKEN', | |
source: userData.name, | |
uploadedBy: ObjectId(userData._id), | |
}); | |
await this.applicantService.updateApplicantStageMapping( | |
applicantDetails?._id, | |
'NO_ACTION_TAKEN', | |
false, | |
); | |
if (applicant?.email) { | |
await this.applicantService.markAllOlderPrimaryApplicationsToSecondary( | |
applicantDetails._id, | |
); | |
} else { | |
await this.applicantService.update(applicantDetails._id, { | |
innerId: v4(), | |
}); | |
} | |
} | |
} | |
console.timeEnd(String(userData.organization)); | |
await this.queueService.update(queue._id, { | |
completedAt: new Date(), | |
}); | |
await this.notificationService.sendNotification({ | |
to: userData._id, | |
content: `${ | |
Array.from(parsedApplicants || []).length | |
} new applicants added to talent pool!`, | |
}); | |
this.gatewayService.sendDatapoolStatus( | |
userData.organization, | |
'Resumes successfully added to talent pool!', | |
hasPlan, | |
added, | |
Math.max(resumesUploaded - added, 0), | |
Math.max(totalUploaded - resumesUploaded, 0), | |
); | |
} catch (e) { | |
this.logger.error(e.message); | |
this.logger.error(JSON.stringify(e.stack)); | |
} finally { | |
// remove job from queue | |
await job.remove(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// inside constructor | |
@InjectQueue('applicant') private applicantQueue: Queue, | |
// method call | |
await this.applicantQueue.add( | |
{ | |
userData: { | |
_id: userDetails._id, | |
name: userDetails.name, | |
organization: userDetails.organization._id, | |
organizationName: userDetails.organization.name, | |
}, | |
totalUploaded, | |
resumesUploaded: toUpload, | |
files: links, | |
hasPlan: !!plan, | |
}, | |
{ removeOnComplete: true, removeOnFail: true }, | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment