Skip to content

Instantly share code, notes, and snippets.

@CMCDragonkai
Last active June 22, 2022 02:14
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CMCDragonkai/4de5c1526fc58dac259e321db8cf5331 to your computer and use it in GitHub Desktop.
Save CMCDragonkai/4de5c1526fc58dac259e321db8cf5331 to your computer and use it in GitHub Desktop.
Read Write Lock #javascript #typescript #concurrency
import type { MutexInterface } from 'async-mutex';
import { Mutex } from 'async-mutex';
/**
* Single threaded read-preferring read write lock
*/
class RWLock {
protected _readerCount: number = 0;
protected _writerCount: number = 0;
protected lock: Mutex = new Mutex();
protected release: MutexInterface.Releaser;
public get readerCount(): number {
return this._readerCount;
}
public get writerCount(): number {
return this._writerCount;
}
public async withRead<T>(f: () => Promise<T>): Promise<T> {
const release = await this.acquireRead();
try {
return await f();
} finally {
release();
}
}
public async withWrite<T>(f: () => Promise<T>): Promise<T> {
const release = await this.acquireWrite();
try {
return await f();
} finally {
release();
}
}
public async acquireRead(): Promise<() => void> {
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
this.release = await this.lock.acquire();
}
return () => {
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.release();
}
};
}
public async acquireWrite(): Promise<() => void> {
++this._writerCount;
this.release = await this.lock.acquire();
return () => {
--this._writerCount;
this.release();
};
}
public isLocked(): boolean {
return this.lock.isLocked();
}
public async waitForUnlock(): Promise<void> {
return this.lock.waitForUnlock();
}
}
import type { MutexInterface } from 'async-mutex';
import { Mutex } from 'async-mutex';
/**
* Single threaded write-preferring read write lock
*/
class RWLock {
protected readersLock: Mutex = new Mutex();
protected writersLock: Mutex = new Mutex();
protected readersRelease: MutexInterface.Releaser;
protected readerCountBlocked: number = 0;
protected _readerCount: number = 0;
protected _writerCount: number = 0;
public get readerCount(): number {
return this._readerCount + this.readerCountBlocked;
}
public get writerCount(): number {
return this._writerCount;
}
public async withRead<T>(f: () => Promise<T>): Promise<T> {
const release = await this.acquireRead();
try {
return await f();
} finally {
release();
}
}
public async withWrite<T>(f: () => Promise<T>): Promise<T> {
const release = await this.acquireWrite();
try {
return await f();
} finally {
release();
}
}
public async acquireRead(): Promise<() => void> {
if (this._writerCount > 0) {
++this.readerCountBlocked;
await this.writersLock.waitForUnlock();
--this.readerCountBlocked;
}
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
this.readersRelease = await this.readersLock.acquire();
}
return () => {
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.readersRelease();
}
};
}
public async acquireWrite(): Promise<() => void> {
++this._writerCount;
const writersRelease = await this.writersLock.acquire();
this.readersRelease = await this.readersLock.acquire();
return () => {
this.readersRelease();
writersRelease();
--this._writerCount;
};
}
public isLocked(): boolean {
return this.readersLock.isLocked() || this.writersLock.isLocked();
}
public async waitForUnlock(): Promise<void> {
await Promise.all([
this.readersLock.waitForUnlock(),
this.writersLock.waitForUnlock()
]);
return;
}
}
@CMCDragonkai
Copy link
Author

CMCDragonkai commented Dec 28, 2021

When using the read-preferring RWLock, we get this behaviour:

    const l1 = lock.withRead(async () => {
      process.stderr.write('read1\n');
    });
    const l2 = lock.withRead(async () => {
      process.stderr.write('read2\n');
    });
    const l3 = lock.withWrite(async () => {
      process.stderr.write('write1\n');
    });
    const l4 = lock.withRead(async () => {
      process.stderr.write('read3\n');
    });
    const l5 = lock.withRead(async () => {
      process.stderr.write('read4\n');
    });
    const l6 = lock.withWrite(async () => {
      process.stderr.write('write2\n');
    });
    await l1;
    await l2;
    await l3;
    await l4;
    await l5;
    await l6;

Output:

read2
read3
read4
read1
write1
write2

See that the reads are accumulated together, and read pressure will starve writes. Furthermore notice that read1 completes last, this is because read1 has more work to do in acquiring the read lock and releasing it, while other read operations are just incrementing and decrementing the counter.

@CMCDragonkai
Copy link
Author

Using the same operations on the write-preferring lock we get this output instead:

read2
read1
write1
read4
read3
write2

At first I thought it would be an unbiased rwlock, because both the subsequent readers or writers could acquire the readers lock. However because the writer has less work to do to acquire the write lock, the writers always win and block any subsequent readers. At any case, this allows writers to interrupt concurrent pressure on readers.

@eblocha
Copy link

eblocha commented Jan 16, 2022

I was able to get the reads/writes to happen in order, using an async no-op. I'm still not sure it this will fail if the number of ticks to .acquire() the lock changes.

/** Forces a function to pause and move itself to the back of the event loop */
const asyncNOP = async () => new Promise((resolve) => resolve(undefined));

We can use this in acquireRead to ensure order is maintained:

public async acquireRead(): Promise<() => void> {
  if (this._writerCount > 0) {
    ++this.readerCountBlocked;
    const writerCount = this._writerCount;
    // Wait for every writer that came before us to unlock, not just the first
    for (let i = 0; i < writerCount; i++) {
      await this.writersLock.waitForUnlock();
    }
    --this.readerCountBlocked;
  }
  const readerCount = ++this._readerCount;
  // The first reader locks
  if (readerCount === 1) {
    this.readersRelease = await this.readersLock.acquire();
  } else {
    // To ensure we use the same number of event loop ticks
    // whether we need to acquire the lock or not
    await asyncNOP();
  }
  return () => {
    const readerCount = --this._readerCount;
    // The last reader unlocks
    if (readerCount === 0) {
      this.readersRelease();
    }
  };
}

Notice that we also need to .waitForUnlock() for every writer before us now, otherwise we will just be waiting on the first writer that was before us.

Test cases, as a bonus (adds the index to the type, e.g. ['read0', 'read1', 'write2', ... ]):

enum LockTypes {
  READ = 'read',
  WRITE = 'write',
}

type TestCase = {
  locks: LockTypes[];
};

const tests: TestCase[] = [
  { locks: [LockTypes.READ, LockTypes.WRITE, LockTypes.READ] },
  { locks: [LockTypes.WRITE, LockTypes.READ] },
  { locks: [LockTypes.READ, LockTypes.WRITE] },
  {
    locks: [
      LockTypes.READ,
      LockTypes.READ,
      LockTypes.WRITE,
      LockTypes.READ,
      LockTypes.READ,
      LockTypes.WRITE,
      LockTypes.READ,
    ],
  },
];

it.each(tests)('Maintains order: $locks', async ({ locks }) => {
  const lock = new RWLock();
  const data: string[] = [];

  const expected = locks.map((type, index) => type + index.toString());

  const fn = async (index: number) => data.push(expected[index]);
  await Promise.all(
    locks.map((type, index) =>
      type === LockTypes.READ
        ? lock.withRead(() => fn(index))
        : lock.withWrite(() => fn(index))
    )
  );
  expect(data).toStrictEqual(expected);
});

it('Maintains order with uneven event loop ticks', async () => {
  const lock = new RWLock();

  const data: string[] = [];

  const locks = [
    LockTypes.READ,
    LockTypes.READ,
    LockTypes.WRITE,
    LockTypes.READ,
    LockTypes.READ,
    LockTypes.WRITE,
    LockTypes.READ,
  ];

  const expected = locks.map((type, index) => type + index.toString());

  const ticks = [0, 1, 2, 0, 3, 0, 1];

  const fn = async (index: number) => {
    for (let i = 0; i < ticks[index]; i++) {
      await asyncNOP();
    }
    data.push(expected[index]);
  };

  await Promise.all(
    locks.map((type, index) =>
      type === LockTypes.READ
        ? lock.withRead(() => fn(index))
        : lock.withWrite(() => fn(index))
    )
  );
  expect(data).toStrictEqual(expected);
});

@CMCDragonkai
Copy link
Author

@eblocha are you talking about read-preferring or write-preferring? And what is the problem with the write-preferring rwlock implementation atm?

@eblocha
Copy link

eblocha commented Jan 17, 2022

I am referring to the write-preferring lock. You illustrated it above:

read2
read1
write1
read4
read3
write2

Even though read1 was called first, it starts executing the callback last. This is because subsequent readers don't have to await acquiring the reader lock, which happens after one event loop. By adding a no-op, subsequent readers get pushed to the back of the event queue, and start their callbacks after the first reader has started its callback. The readers still run concurrently, but their callbacks are "started" in the order they requested the lock.

This would change the output to:

read1
read2
write1
read3
read4
write2

It adds some latency to the other readers, but ensures tasks run in the order they are received.

@CMCDragonkai
Copy link
Author

Interesting, however I'm not sure if the solution is very robust because like you said it relies on a number of event loop ticks... I'm not even sure if that's why it does it though.

@CMCDragonkai
Copy link
Author

CMCDragonkai commented Mar 28, 2022

I've turned this into a package: https://github.com/MatrixAI/js-async-locks. Bug fixes and new features go to the package. Don't use this snippet!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment