Skip to content

Instantly share code, notes, and snippets.

@tomfun
Last active September 29, 2023 08:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomfun/26f909789d168d44acbeed08f502cd91 to your computer and use it in GitHub Desktop.
Save tomfun/26f909789d168d44acbeed08f502cd91 to your computer and use it in GitHub Desktop.

Pessimistic Locking in Node.js with TypeORM

In this article, we will explore pessimistic locking, its main aspects, and how to implement it in Node.js using TypeORM. Pessimistic locking is a strategy to prevent concurrent access to a resource by obtaining exclusive access to the resource before making any changes. This approach ensures that other transactions cannot modify the locked resource until the lock is released.

We will use a practical example to demonstrate how to implement pessimistic locking in a Node.js application using the TypeORM library. Our example will focus on a payment processing scenario, where multiple processes might attempt to update the status of a payment concurrently.

Potential Sources of Errors

In the process of handling payments using an API and a database, there are several potential sources of errors that you may encounter:

  1. Network issues: Communication between your application and the API server or the database server might be interrupted or delayed due to network problems or server unavailability.

  2. Concurrency issues: Multiple transactions attempting to perform operations on the same records simultaneously can lead to race conditions, deadlocks, or other inconsistencies.

  3. API errors: The API server might return errors due to invalid inputs, insufficient funds, or other unexpected issues.

  4. Database errors: Problems with the database server, such as connection issues or query execution errors, can disrupt the payment processing flow.

To address these potential errors, it's essential to design a robust and fault-tolerant payment handling system. Implementing idempotent logic using database transactions, locking mechanisms, and proper error handling can help ensure that your system can recover from errors and continue processing payments correctly, even in the presence of retries or failures.

Introduction to Pessimistic Locking

Pessimistic locking is a technique used to prevent concurrent access to a resource, such as a row in a database. When a transaction needs to modify a resource, it first obtains a lock on the resource. Other transactions attempting to access the locked resource will either wait for the lock to be released or fail immediately, depending on the locking mode used.

Pessimistic locking can help ensure data consistency in scenarios where multiple transactions compete for the same resource. However, it can also lead to reduced performance and potential deadlocks, so it's essential to use it judiciously.

Implementing Pessimistic Locking with TypeORM

In our example, we have a PaymentEntity that represents a payment in our system. We want to update the status of a payment atomically and ensure that no other process can change the status while the update is in progress.

Here's the code that demonstrates how to implement pessimistic locking using TypeORM:

export class AccountingService {
  private logger = new Logger(AccountingService.name);

  @InjectEntityManager()
  private readonly manager: EntityManager;

  constructor() {
    // ... code to initialize variables and run the example
  }

  private async justRead(id: number | string, time: number) {
    // ... code to read the payment status without locking
  }

  private async run(id: number, time: number) {
    // ... code to update the payment status with pessimistic locking
  }
}

In this code snippet, we have an AccountingService class that uses the EntityManager from TypeORM to interact with the database. The run method will update the payment status using pessimistic locking, while the justRead method will read the payment status without acquiring any lock.

Let's take a closer look at the run method, which performs the following steps:

The run method addresses the main problem by performing the following steps within a separate database connection:

  1. Obtain a pessimistic lock on the PaymentEntity row we want to update.
  2. If the payment status is already set to ok or failed, skip the rest of the steps.
  3. Simulate an API call to check/read the status of the completed action using a setTimeout.
  4. If done is false, it means we don't have an API call yet. In this case, update the payment status based on the result of the simulated API call.
  5. If done is true, we have the result of an already made API call by another thread/process, and we can update the payment status accordingly.

This approach ensures that the status of the payment is updated atomically while preventing other processes from updating the same payment concurrently, thanks to the pessimistic lock.

await queryRunner.startTransaction();
const p = await m.findOne(PaymentEntity, {
  where: { uuid },
  lock: { mode: 'pessimistic_write' },
});
const done = // ...check if we already have called API in another process. e.g. "get payment by saved id"
if (!done) {
  // we will do API call now. e.g. "make a Payment"
  done = true;
  p.status = status;
  await m.save(p);
  await queryRunner.commitTransaction();
} else {
  const status = // get its operation status
  p.status = status;
  await m.save(p);
  await queryRunner.commitTransaction();
}

Here is close to reallife example:

import { EntityManager } from 'typeorm';

async function processPayment(entityManager: EntityManager, paymentId: number) {
  await entityManager.transaction(async (db) => {
    const payment = await db.findOne(PaymentEntity, paymentId, {
      lock: { mode: 'pessimistic_write' },
    });

    if (payment.status === 'completed' || payment.status === 'failed') {
      return;
    }

    const done = // ...check if we already have called API in another process. e.g. "get payment by saved id"
    if (!done) {
      // we will do API call now. e.g. "make a Payment"
      done = true;
      const apiCallId = await apiMakePayment(payment);
      payment.api_call_id = apiCallId;
      payment.status = apiCallId ? 'completed' : 'failed';
      await db.save(payment);
    } else {
      const status = // get its operation status
      payment.status = status;
      await db.save(payment);
    }
    await db.commitTransaction();
  });
}

// Fictional API call functions
async function apiCheckPaymentStatus(apiCallId: string): Promise<string> {
  // ...
}

async function apiMakePayment(payment: PaymentEntity): Promise<string> {
  // ...
}

Launch (stripped)

We do not simulate DB error, start from 'new' status, have 3 concurrent jobs and 1 non locking read:

await Promise.all([
  run(1, 15000),
  run(2, 5000),
  run(3, 1000),
  justRead('r', 500),
]);

The output

00:00 Server listening at http://0.0.0.0:3005

00:00 query: UPDATE "payment" SET "status" = $1, "updatedAt" = CURRENT_TIMESTAMP WHERE "uuid" = $2 -- PARAMETERS: ["new","e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]

00:00 begin game

00:00 query: START TRANSACTION
00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:00 1 got new
00:00 query: START TRANSACTION
00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:00 query: START TRANSACTION
00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]

00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:00 r jstread got new

00:15 1 API call: OK
00:15 query: SELECT payment details WHERE "uuid" IN ($1) -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:15 query: UPDATE "payment" SET "status" = $1, "updatedAt" = CURRENT_TIMESTAMP WHERE "uuid" IN ($2) RETURNING "updatedAt" -- PARAMETERS: ["payed","e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:15 query: COMMIT
00:15 1 done

00:15 3 got payed
00:15 3 initially skipped
00:15 query: COMMIT

00:15 2 got payed
00:15 2 initially skipped
00:15 query: COMMIT

Already processed

start from completed task

      // this.manager.update(
      //   PaymentEntity,
      //   { uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' },
      //   { status: PaymentStatus.NEW },
      // );

Will give

00:00 | begin game

00:00 | query: START TRANSACTION
00:00 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:00 | 1 got payed
00:00 | 1 initially skipped
00:00 | query: COMMIT

00:00 | query: START TRANSACTION
00:00 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:00 | query: START TRANSACTION
00:00 | 2 got payed
00:00 | 2 initially skipped
00:00 | query: COMMIT
00:00 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:00 | 3 got payed
00:00 | 3 initially skipped
00:00 | query: COMMIT

00:01 | query: SELECT ... FROM "payment" ... -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:01 | r jstread got payed
  • The game begins, and multiple transactions are initiated.
  • Transactions 1, 2, and 3 get payed and initially skipped.
  • Transaction 1, 2, and 3 are committed.
  • A read-only transaction 'r' reads the payment information.

DB Error

// To simulate DB error:
// uncomment
queryRunner.release();
this.logger.log(`\n${id} simulate db error`);
// comment
// await m.save(p);

This code simulates a database error by releasing the query runner before attempting to save the payment entity. The logger logs a message with the id and the text "simulate db error" to indicate that the error has been simulated.

The second block of code simulates a new fresh job:

// To simulate new fresh job:
this.manager.update(
  PaymentEntity,
  { uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' },
  { status: PaymentStatus.NEW },
);
await new Promise((r) => setTimeout(r, 500));
this.logger.log(`\nbegin game\n`);
await Promise.all([
  run(1, 15000),
  run(2, 5000),
  run(3, 1000),
  justRead('r', 500),
]).catch((e) => setTimeout(() => console.log(e), 5000)); // log error delayed
00:00 | begin game

00:01 | query: START TRANSACTION
00:01 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:01 | 1 got new

00:01 | query: START TRANSACTION
00:01 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:01 | query: START TRANSACTION
00:01 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]

00:01 | query: SELECT ... FROM "payment" ... -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:01 | r jstread got new

00:15 | 1 API call: OK
00:15 | 1 simulate db error
00:15 | QueryRunnerAlreadyReleasedError: Query runner already released. Cannot run queries anymore.

00:25 | 3 got new
00:26 | 3 API already affected - switch status OK
00:26 | query: SELECT ... FROM "payment" ... -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:26 | query: UPDATE "payment" SET "status" = $1, "updatedAt" = CURRENT_TIMESTAMP WHERE "uuid" IN ($2) RETURNING "updatedAt" -- PARAMETERS: ["payed","e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"]
00:26 | query: COMMIT
00:26 | 2 got payed
00:26 | 2 initially skipped
00:26 | query: COMMIT

Optimistic Locking

Optimistic locking is a concurrency control technique that assumes multiple transactions can complete without affecting each other. It only checks for conflicts when the transaction is ready to commit. If a conflict is detected, the transaction is rolled back and must be retried.

Pessimistic Locking

Pessimistic Write

This mode locks the selected records for update, preventing other transactions from modifying them until the lock is released. Other transactions trying to update the locked records will have to wait for the lock to be released.

SELECT ... FROM "payment" ... FOR UPDATE;

Pessimistic Write or Fail

This mode attempts to lock the selected records for update. If any of the records are already locked, the query will fail immediately.

SELECT ... FROM "payment" ... FOR UPDATE NOWAIT;

Pessimistic Partial Write

This mode attempts to lock the selected records for update. If any of the records are already locked, it will return the remaining unlocked records and skip the locked ones.

SELECT ... FROM "payment" ... FOR UPDATE SKIP LOCKED;

Pessimistic Read

This mode allows multiple transactions to read the same record simultaneously but prevents any updates to the record until the lock is released. It leads to deadlocks in our example.

SELECT ... FROM "payment" ... FOR SHARE;
import { Logger } from '@nestjs/common';
import { InjectEntityManager } from '@nestjs/typeorm/dist/common/typeorm.decorators';
import { EntityManager } from 'typeorm';
import { QueryRunner } from 'typeorm/query-runner/QueryRunner';
import {
PaymentEntity,
PaymentStatus,
} from '../entity';
export class AccountingService {
private logger = new Logger(AccountingService.name);
@InjectEntityManager()
private readonly manager: EntityManager;
constructor() {
const uuid = 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac';
const OK = PaymentStatus.PAYED;
const FAILED = PaymentStatus.REFUNDED;
const status = Math.random() > 0.8 ? FAILED : OK;
const statusText = status === OK ? 'OK' : 'FAILED';
let done = false;
const justRead = async (id: number | string, time: number) => {
const queryRunner = this.manager.connection.createQueryRunner();
await queryRunner.connect();
try {
const m = queryRunner.manager;
await new Promise((r) => setTimeout(r, time));
const p = await m.findOne(PaymentEntity, {
where: { uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' },
});
this.logger.log(`\n${id} jstread got ${p && p.status}`);
} finally {
await queryRunner.release();
}
};
const run = async (id: number, time: number) => {
const queryRunner = this.manager.connection.createQueryRunner();
await queryRunner.connect();
try {
const m = queryRunner.manager;
await queryRunner.startTransaction();
/*
pessimistic_write = SELECT ... FOR UPDATE - wait for commit
pessimistic_write_or_fail = SELECT ... FOR UPDATE NOWAIT - fail if can't lock
pessimistic_partial_write = SELECT ... FOR UPDATE SKIP LOCKED - return null
pessimistic_read = SELECT ... FOR SHARE - read & deadlocks :(
*/
const p = await m.findOne(PaymentEntity, {
where: { uuid },
lock: { mode: 'pessimistic_write' },
});
this.logger.log(`\n${id} got ${p && p.status}`);
if (p.status === OK || p.status === FAILED) {
this.logger.log(`\n${id} initially skipped`);
await queryRunner.commitTransaction();
return;
}
await new Promise((r) => setTimeout(r, time));
if (!done) {
done = true;
p.status = status;
this.logger.log(`\n${id} API call: ${statusText}`);
// To simulate DB error:
// uncomment
// queryRunner.release();
// this.logger.log(`\n${id} simulate db error`);
// comment
await m.save(p);
await queryRunner.commitTransaction();
this.logger.log(`\n${id} done`);
} else {
this.logger.log(
`\n${id} API already affected - switch status ${statusText}`,
);
p.status = status;
await m.save(p);
await queryRunner.commitTransaction();
return;
}
} finally {
await queryRunner.release();
}
};
setTimeout(async () => {
// To simulate new fresh job:
this.manager.update(
PaymentEntity,
{ uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' },
{ status: PaymentStatus.NEW },
);
await new Promise((r) => setTimeout(r, 500));
this.logger.log(`\nbegin game\n`);
await Promise.all([
run(1, 15000),
run(2, 5000),
run(3, 1000),
justRead('r', 500),
]).catch((e) => setTimeout(() => console.log(e), 5000));
}, 2000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment