Skip to content

Instantly share code, notes, and snippets.

@wermarter
Created May 26, 2023 02:50
Show Gist options
  • Save wermarter/31d7b8ac183bab79e3f98271a4ee0b75 to your computer and use it in GitHub Desktop.
Save wermarter/31d7b8ac183bab79e3f98271a4ee0b75 to your computer and use it in GitHub Desktop.
nestjs-cargo-queue
export class BaseWorkerService<TaskInput, TaskOutput> {
private queue: QueueObject<TaskInput>;
private logger: Logger;
private workCounter = 0;
constructor(
workerName: string,
concurrency: number,
batchSize: number,
batchProcessor: (input: TaskInput[]) => Promise<TaskOutput[]>,
batchCallback: (output: TaskOutput[]) => Promise<void>,
) {
this.logger = new Logger(`worker[${workerName}]`);
this.queue = cargoQueue(
async (tasks) => {
const results = await batchProcessor(tasks);
this.logger.debug(
`[${++this.workCounter}] Processed ${results.length} tasks`,
);
await batchCallback(results);
},
concurrency,
batchSize,
);
}
async push(input: TaskInput) {
return this.queue.pushAsync(input);
}
terminate() {
return this.queue.kill();
}
}
const timeoutWorker = new BaseWorkerService(
TIMEOUT_WORKER,
2,
2,
async (inputs: string[]) => {
await new Promise((r) => setTimeout(r, 2000));
return inputs;
},
async (outputs) => {
outputs.forEach((output) => {
console.log('--', output);
});
},
);
const timeoutProvider = {
provide: TIMEOUT_WORKER,
useValue: timeoutWorker,
};
@Module({
imports: [],
providers: [BaseWorkerService, timeoutProvider],
exports: [timeoutProvider],
})
export class WorkerModule {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment