Skip to content

Instantly share code, notes, and snippets.

@renatocron
Created June 27, 2023 04:54
Show Gist options
  • Save renatocron/5282233bd3120e303ce470dfd769b33e to your computer and use it in GitHub Desktop.
Save renatocron/5282233bd3120e303ce470dfd769b33e to your computer and use it in GitHub Desktop.

Title: Batching requests and processing in parallel using NestJS

Hello, I have a use case where I want to batch incoming requests from multiple users in my NestJS application to reduce network usage, database queries, and optimize processing. Ideally, I'd like to store the requests and their response objects in an in-memory queue within the same process.

My requirements are:

  • Group incoming requests into a single transaction (e.g., 1000 requests within a second).
  • Perform batch operations in parallel to reduce the number of database queries.
  • Wait for a quorum to be reached for successful operations before sending responses.
  • Handle errors and timeouts without affecting other users' requests.
  • Keep the queue and processing control within the same process.

I've considered building a custom queue service to manage the queue, batch processing, and error handling. Are there any known solutions or best practices for implementing this kind of batching and parallel processing in NestJS? Any library or package recommendations are also welcome!


Here's a basic example of how you might implement a batching system in NestJS. This example demonstrates the key components of the system, although you should still handle error and timeout cases more thoroughly for production use:

Batch Service (batch.service.ts) Create a custom queue service to manage the batching and parallel processing.

import { Injectable, Logger } from '@nestjs/common';

@Injectable()
export class BatchService {
  private readonly queue: Array<{ item: number; response: any }> = [];

  constructor() {
    setInterval(() => this.processQueue(), 1000); // Adjust the interval as needed.
  }

  addToQueue(item: number, response: any) {
    this.queue.push({ item, response });
  }

  async processQueue() {
    // Group requests into batches (here, the entire queue)
    const batch = this.queue.splice(0, this.queue.length);
    if (batch.length === 0) {
      return;
    }
    Logger.log(`Processing batch of size: ${batch.length}`);

    try {
      // Run batch operations in parallel
      const results = await Promise.all(batch.map(({ item }) => this.performOperation(item)));

      // Update responses
      batch.forEach(({ response }, index) => {
        const success = results[index];
        if (success) {
          response.status(200).send({ message: `Operation for item ${item} was successful.` });
        } else {
          response.status(500).send({ message: `Operation for item ${item} has failed.` });
        }
      });
    } catch (error) {
      Logger.error('Error processing the batch', error);
      // Handle error cases
    }
  }

  // This function represents the operation you want to perform in parallel for each item.
  async performOperation(item: number): Promise<boolean> {
    // Perform your operation here
    // Example: a successful dummy operation that returns true
    return Promise.resolve(true);
  }
}

Batch Module (batch.module.ts)

Create a module for the BatchService.

import { Module } from '@nestjs/common';
import { BatchService } from './batch.service';

@Module({
  providers: [BatchService],
  exports: [BatchService],
})
export class BatchModule {}

App Module (app.module.ts)

Add the BatchModule to app.module.ts.

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { BatchModule } from './batch/batch.module';

@Module({
  imports: [BatchModule],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Usage (app.controller.ts)

Add requests to the queue whenever they hit your endpoint.

import { Controller, Post, Res, Query } from '@nestjs/common';
import { Response } from 'express';
import { BatchService } from './batch/batch.service';

@Controller()
export class AppController {
  constructor(private readonly batchService: BatchService) {}

  @Post('resource')
  addToBatch(@Query('item') item: number, @Res() response: Response) {
    this.batchService.addToQueue(item, response);
  }
}

This example demonstrates batch processing with an in-memory queue, performing parallel operations, responding to requests, and using a setInterval-based processing loop. The example is a starting point and can be improved by ensuring better error handling, timeouts, and other requirements you may have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment