-
-
Save CMCDragonkai/4de5c1526fc58dac259e321db8cf5331 to your computer and use it in GitHub Desktop.
import type { MutexInterface } from 'async-mutex'; | |
import { Mutex } from 'async-mutex'; | |
/** | |
* Single threaded read-preferring read write lock | |
*/ | |
class RWLock { | |
protected _readerCount: number = 0; | |
protected _writerCount: number = 0; | |
protected lock: Mutex = new Mutex(); | |
protected release: MutexInterface.Releaser; | |
public get readerCount(): number { | |
return this._readerCount; | |
} | |
public get writerCount(): number { | |
return this._writerCount; | |
} | |
public async withRead<T>(f: () => Promise<T>): Promise<T> { | |
const release = await this.acquireRead(); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async withWrite<T>(f: () => Promise<T>): Promise<T> { | |
const release = await this.acquireWrite(); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async acquireRead(): Promise<() => void> { | |
const readerCount = ++this._readerCount; | |
// The first reader locks | |
if (readerCount === 1) { | |
this.release = await this.lock.acquire(); | |
} | |
return () => { | |
const readerCount = --this._readerCount; | |
// The last reader unlocks | |
if (readerCount === 0) { | |
this.release(); | |
} | |
}; | |
} | |
public async acquireWrite(): Promise<() => void> { | |
++this._writerCount; | |
this.release = await this.lock.acquire(); | |
return () => { | |
--this._writerCount; | |
this.release(); | |
}; | |
} | |
public isLocked(): boolean { | |
return this.lock.isLocked(); | |
} | |
public async waitForUnlock(): Promise<void> { | |
return this.lock.waitForUnlock(); | |
} | |
} |
import type { MutexInterface } from 'async-mutex'; | |
import { Mutex } from 'async-mutex'; | |
/** | |
* Single threaded write-preferring read write lock | |
*/ | |
class RWLock { | |
protected readersLock: Mutex = new Mutex(); | |
protected writersLock: Mutex = new Mutex(); | |
protected readersRelease: MutexInterface.Releaser; | |
protected readerCountBlocked: number = 0; | |
protected _readerCount: number = 0; | |
protected _writerCount: number = 0; | |
public get readerCount(): number { | |
return this._readerCount + this.readerCountBlocked; | |
} | |
public get writerCount(): number { | |
return this._writerCount; | |
} | |
public async withRead<T>(f: () => Promise<T>): Promise<T> { | |
const release = await this.acquireRead(); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async withWrite<T>(f: () => Promise<T>): Promise<T> { | |
const release = await this.acquireWrite(); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async acquireRead(): Promise<() => void> { | |
if (this._writerCount > 0) { | |
++this.readerCountBlocked; | |
await this.writersLock.waitForUnlock(); | |
--this.readerCountBlocked; | |
} | |
const readerCount = ++this._readerCount; | |
// The first reader locks | |
if (readerCount === 1) { | |
this.readersRelease = await this.readersLock.acquire(); | |
} | |
return () => { | |
const readerCount = --this._readerCount; | |
// The last reader unlocks | |
if (readerCount === 0) { | |
this.readersRelease(); | |
} | |
}; | |
} | |
public async acquireWrite(): Promise<() => void> { | |
++this._writerCount; | |
const writersRelease = await this.writersLock.acquire(); | |
this.readersRelease = await this.readersLock.acquire(); | |
return () => { | |
this.readersRelease(); | |
writersRelease(); | |
--this._writerCount; | |
}; | |
} | |
public isLocked(): boolean { | |
return this.readersLock.isLocked() || this.writersLock.isLocked(); | |
} | |
public async waitForUnlock(): Promise<void> { | |
await Promise.all([ | |
this.readersLock.waitForUnlock(), | |
this.writersLock.waitForUnlock() | |
]); | |
return; | |
} | |
} |
Using the same operations on the write-preferring lock we get this output instead:
read2
read1
write1
read4
read3
write2
At first I thought it would be an unbiased rwlock, because both the subsequent readers or writers could acquire the readers lock. However because the writer has less work to do to acquire the write lock, the writers always win and block any subsequent readers. At any case, this allows writers to interrupt concurrent pressure on readers.
I was able to get the reads/writes to happen in order, using an async no-op. I'm still not sure it this will fail if the number of ticks to .acquire()
the lock changes.
/** Forces a function to pause and move itself to the back of the event loop */
const asyncNOP = async () => new Promise((resolve) => resolve(undefined));
We can use this in acquireRead
to ensure order is maintained:
public async acquireRead(): Promise<() => void> {
if (this._writerCount > 0) {
++this.readerCountBlocked;
const writerCount = this._writerCount;
// Wait for every writer that came before us to unlock, not just the first
for (let i = 0; i < writerCount; i++) {
await this.writersLock.waitForUnlock();
}
--this.readerCountBlocked;
}
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
this.readersRelease = await this.readersLock.acquire();
} else {
// To ensure we use the same number of event loop ticks
// whether we need to acquire the lock or not
await asyncNOP();
}
return () => {
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.readersRelease();
}
};
}
Notice that we also need to .waitForUnlock()
for every writer before us now, otherwise we will just be waiting on the first writer that was before us.
Test cases, as a bonus (adds the index to the type, e.g. ['read0', 'read1', 'write2', ... ]
):
enum LockTypes {
READ = 'read',
WRITE = 'write',
}
type TestCase = {
locks: LockTypes[];
};
const tests: TestCase[] = [
{ locks: [LockTypes.READ, LockTypes.WRITE, LockTypes.READ] },
{ locks: [LockTypes.WRITE, LockTypes.READ] },
{ locks: [LockTypes.READ, LockTypes.WRITE] },
{
locks: [
LockTypes.READ,
LockTypes.READ,
LockTypes.WRITE,
LockTypes.READ,
LockTypes.READ,
LockTypes.WRITE,
LockTypes.READ,
],
},
];
it.each(tests)('Maintains order: $locks', async ({ locks }) => {
const lock = new RWLock();
const data: string[] = [];
const expected = locks.map((type, index) => type + index.toString());
const fn = async (index: number) => data.push(expected[index]);
await Promise.all(
locks.map((type, index) =>
type === LockTypes.READ
? lock.withRead(() => fn(index))
: lock.withWrite(() => fn(index))
)
);
expect(data).toStrictEqual(expected);
});
it('Maintains order with uneven event loop ticks', async () => {
const lock = new RWLock();
const data: string[] = [];
const locks = [
LockTypes.READ,
LockTypes.READ,
LockTypes.WRITE,
LockTypes.READ,
LockTypes.READ,
LockTypes.WRITE,
LockTypes.READ,
];
const expected = locks.map((type, index) => type + index.toString());
const ticks = [0, 1, 2, 0, 3, 0, 1];
const fn = async (index: number) => {
for (let i = 0; i < ticks[index]; i++) {
await asyncNOP();
}
data.push(expected[index]);
};
await Promise.all(
locks.map((type, index) =>
type === LockTypes.READ
? lock.withRead(() => fn(index))
: lock.withWrite(() => fn(index))
)
);
expect(data).toStrictEqual(expected);
});
@eblocha are you talking about read-preferring or write-preferring? And what is the problem with the write-preferring rwlock implementation atm?
I am referring to the write-preferring lock. You illustrated it above:
read2
read1
write1
read4
read3
write2
Even though read1 was called first, it starts executing the callback last. This is because subsequent readers don't have to await
acquiring the reader lock, which happens after one event loop. By adding a no-op, subsequent readers get pushed to the back of the event queue, and start their callbacks after the first reader has started its callback. The readers still run concurrently, but their callbacks are "started" in the order they requested the lock.
This would change the output to:
read1
read2
write1
read3
read4
write2
It adds some latency to the other readers, but ensures tasks run in the order they are received.
Interesting, however I'm not sure if the solution is very robust because like you said it relies on a number of event loop ticks... I'm not even sure if that's why it does it though.
I've turned this into a package: https://github.com/MatrixAI/js-async-locks. Bug fixes and new features go to the package. Don't use this snippet!
When using the read-preferring RWLock, we get this behaviour:
Output:
See that the reads are accumulated together, and read pressure will starve writes. Furthermore notice that
read1
completes last, this is becauseread1
has more work to do in acquiring the read lock and releasing it, while other read operations are just incrementing and decrementing the counter.