Skip to content

Instantly share code, notes, and snippets.

@sambhav2612
Created July 6, 2022 14:13
Show Gist options
  • Save sambhav2612/76ff2ec4ab6a8153f8df7b4313b7c2eb to your computer and use it in GitHub Desktop.
Save sambhav2612/76ff2ec4ab6a8153f8df7b4313b7c2eb to your computer and use it in GitHub Desktop.
nestjs bull queue
BullModule.registerQueue({
name: 'applicant',
}),
import {
OnQueueActive,
OnQueueCompleted,
OnQueueError,
OnQueueFailed,
Process,
Processor,
} from '@nestjs/bull';
import { Job } from 'bull';
import { AiService } from '../services/ai.service';
import { Types } from 'mongoose';
import { ApplicantService } from '../modules/applicant/applicant.service';
import { NotificationService } from '../modules/notification/notification.service';
import { QueueService } from '../modules/queue/queue.service';
import { GatewayService } from '../services/gateway.service';
import { Logger } from '@nestjs/common';
import { v4 } from 'uuid';
const { ObjectId } = Types;
@Processor('applicant')
export class ApplicantProcessor {
private readonly logger = new Logger(ApplicantProcessor.name);
constructor(
private readonly gatewayService: GatewayService,
private readonly notificationService: NotificationService,
private readonly applicantService: ApplicantService,
private readonly aiService: AiService,
private readonly queueService: QueueService,
) {}
@OnQueueError()
onError(error: Error) {
this.logger.log(
`Error Processing Datapool queue ${error.message}: ${JSON.stringify(
error.stack,
)}`,
);
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
this.logger.log(
`Failed Processing Datapool id ${job.data.userData.organization}... ${
error.message
}: ${JSON.stringify(error.stack)}`,
);
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(
`Processing Datapool id ${job.data.userData.organization}...`,
);
}
@OnQueueCompleted()
onComplete(job: Job) {
this.logger.log(
`Completed Processing Datapool id ${job.data.userData.organization}...`,
);
}
@Process()
async processApplicantData(job: Job<object>) {
try {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const userData = job.data.userData;
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const hasPlan = job.data.hasPlan;
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const totalUploaded = job.data.totalUploaded;
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const resumesUploaded = job.data.resumesUploaded;
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const files: Array<string> = job.data.files || [];
console.time(String(userData.organization));
const queue = await this.queueService.create({
type: 'DATAPOOL',
entityId: userData.organization,
startedAt: new Date(),
});
let added = 0;
this.logger.log(
`Datapool resume upload request started at ${new Date().toLocaleString(
'en-IN',
)}`,
);
const parsedApplicants =
(await this.aiService.getApplicantDataFromResumes(files)) || [];
this.logger.log(
`Datapool resume upload request completed at ${new Date().toLocaleString(
'en-IN',
)}`,
);
for (const applicant of parsedApplicants) {
const olderApplication = await this.applicantService.findOlderApplicationForApplicantInDatapool(
applicant.email,
userData.organization,
);
if (!olderApplication || !applicant.email || !applicant.phoneNumber) {
++added;
const applicantDetails = await this.applicantService.createApplicant({
primary: true,
name: applicant.name || '',
email: applicant.email || '',
mobile: applicant.phoneNumber || '',
organization: ObjectId(userData.organization),
resume: applicant.resumeUrl,
resumeText: applicant.resumeText,
status: 'NO_ACTION_TAKEN',
source: userData.name,
uploadedBy: ObjectId(userData._id),
});
await this.applicantService.updateApplicantStageMapping(
applicantDetails?._id,
'NO_ACTION_TAKEN',
false,
);
if (applicant?.email) {
await this.applicantService.markAllOlderPrimaryApplicationsToSecondary(
applicantDetails._id,
);
} else {
await this.applicantService.update(applicantDetails._id, {
innerId: v4(),
});
}
}
}
console.timeEnd(String(userData.organization));
await this.queueService.update(queue._id, {
completedAt: new Date(),
});
await this.notificationService.sendNotification({
to: userData._id,
content: `${
Array.from(parsedApplicants || []).length
} new applicants added to talent pool!`,
});
this.gatewayService.sendDatapoolStatus(
userData.organization,
'Resumes successfully added to talent pool!',
hasPlan,
added,
Math.max(resumesUploaded - added, 0),
Math.max(totalUploaded - resumesUploaded, 0),
);
} catch (e) {
this.logger.error(e.message);
this.logger.error(JSON.stringify(e.stack));
} finally {
// remove job from queue
await job.remove();
}
}
}
// inside constructor
@InjectQueue('applicant') private applicantQueue: Queue,
// method call
await this.applicantQueue.add(
{
userData: {
_id: userDetails._id,
name: userDetails.name,
organization: userDetails.organization._id,
organizationName: userDetails.organization.name,
},
totalUploaded,
resumesUploaded: toUpload,
files: links,
hasPlan: !!plan,
},
{ removeOnComplete: true, removeOnFail: true },
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment