Skip to content

Instantly share code, notes, and snippets.

@GavinRay97
Created January 29, 2023 16:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save GavinRay97/a6fbdefb2dc54317e90c0b4dcb40150b to your computer and use it in GitHub Desktop.
Save GavinRay97/a6fbdefb2dc54317e90c0b4dcb40150b to your computer and use it in GitHub Desktop.
TypeScript Node.js/Browser Mutex + Read-Write Mutex using Atomics
class AsyncLock {
private _lock = new Int32Array(new SharedArrayBuffer(4)) // SharedArrayBuffer for multi-threading, 4 bytes for 32-bit integer
static INDEX = 0
static UNLOCKED = 0
static LOCKED = 1
lock() {
while (true) {
console.log("lock")
const oldValue = Atomics.compareExchange(
this._lock,
AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED
)
if (oldValue == AsyncLock.UNLOCKED) {
return
}
Atomics.wait(this._lock, AsyncLock.INDEX, AsyncLock.LOCKED) // <<< expected value at start
}
}
unlock() {
console.log("unlock")
const oldValue = Atomics.compareExchange(
this._lock,
AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED
)
if (oldValue != AsyncLock.LOCKED) {
throw new Error("Tried to unlock while not holding the mutex")
}
Atomics.notify(this._lock, AsyncLock.INDEX, 1)
}
executeLocked(fn: () => void): Promise<"ok" | "not-equal" | "timed-out" | undefined> {
const self = this
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(
self._lock,
AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED
)
if (oldValue == AsyncLock.UNLOCKED) {
fn()
self.unlock()
return
}
// @ts-expect-error
const result = Atomics.waitAsync(self._lock, AsyncLock.INDEX, AsyncLock.LOCKED)
// ^ expected value at start
return await result.value
}
}
return tryGetLock()
}
}
class AsyncReadWriteLock {
private _lock = new Int32Array(new SharedArrayBuffer(4)) // SharedArrayBuffer for multi-threading, 4 bytes for 32-bit integer
static INDEX = 0
static UNLOCKED = 0
static WRITER_LOCKED = -1
static READERS_UPDATE = 2
async lockRead() {
while (true) {
const lockState = Atomics.load(this._lock, AsyncReadWriteLock.INDEX)
if (lockState == AsyncReadWriteLock.UNLOCKED) {
const newLockState = lockState + AsyncReadWriteLock.READERS_UPDATE
const oldValue = Atomics.compareExchange(this._lock, AsyncReadWriteLock.INDEX, lockState, newLockState)
if (oldValue == lockState) {
return
}
}
// @ts-expect-error
await Atomics.waitAsync(this._lock, AsyncReadWriteLock.INDEX, lockState)
}
}
async lockWrite() {
while (true) {
const lockState = Atomics.load(this._lock, AsyncReadWriteLock.INDEX)
if (lockState == AsyncReadWriteLock.UNLOCKED) {
const oldValue = Atomics.compareExchange(
this._lock,
AsyncReadWriteLock.INDEX,
AsyncReadWriteLock.UNLOCKED,
AsyncReadWriteLock.WRITER_LOCKED
)
if (oldValue == AsyncReadWriteLock.UNLOCKED) {
return
}
}
// @ts-expect-error
await Atomics.waitAsync(this._lock, AsyncReadWriteLock.INDEX, lockState)
}
}
unlockRead() {
const lockState = Atomics.sub(this._lock, AsyncReadWriteLock.INDEX, AsyncReadWriteLock.READERS_UPDATE)
if (lockState < AsyncReadWriteLock.READERS_UPDATE) {
throw new Error("Tried to unlock read lock while not holding the lock")
}
Atomics.notify(this._lock, AsyncReadWriteLock.INDEX, 1)
}
unlockWrite() {
const oldValue = Atomics.compareExchange(
this._lock,
AsyncReadWriteLock.INDEX,
AsyncReadWriteLock.WRITER_LOCKED,
AsyncReadWriteLock.UNLOCKED
)
if (oldValue != AsyncReadWriteLock.WRITER_LOCKED) {
throw new Error("Tried to unlock write lock while not holding the lock")
}
Atomics.notify(this._lock, AsyncReadWriteLock.INDEX, 1)
}
async executeReadLocked(fn: () => Promise<void>): Promise<"ok" | "not-equal" | "timed-out" | undefined> {
const self = this
async function tryGetReadLock() {
while (true) {
const lockState = Atomics.load(self._lock, AsyncReadWriteLock.INDEX)
if (lockState >= AsyncReadWriteLock.UNLOCKED) {
const newLockState = lockState + AsyncReadWriteLock.READERS_UPDATE
const oldValue = Atomics.compareExchange(
self._lock,
AsyncReadWriteLock.INDEX,
lockState,
newLockState
)
if (oldValue == lockState) {
await fn()
self.unlockRead()
return
}
}
// @ts-expect-error
const result = Atomics.waitAsync(self._lock, AsyncReadWriteLock.INDEX, lockState)
return await result.value
}
}
return tryGetReadLock()
}
async executeWriteLocked(fn: () => Promise<void>): Promise<"ok" | "not-equal" | "timed-out" | undefined> {
const self = this
async function tryGetWriteLock() {
while (true) {
const lockState = Atomics.load(self._lock, AsyncReadWriteLock.INDEX)
if (lockState == AsyncReadWriteLock.UNLOCKED) {
const oldValue = Atomics.compareExchange(
self._lock,
AsyncReadWriteLock.INDEX,
AsyncReadWriteLock.UNLOCKED,
AsyncReadWriteLock.WRITER_LOCKED
)
if (oldValue == AsyncReadWriteLock.UNLOCKED) {
await fn()
self.unlockWrite()
return
}
}
// @ts-expect-error
const result = Atomics.waitAsync(self._lock, AsyncReadWriteLock.INDEX, lockState)
return await result.value
}
}
return tryGetWriteLock()
}
}
class ReadWriteLock {
private lock = new Int32Array(new SharedArrayBuffer(4)) // SharedArrayBuffer for multi-threading, 4 bytes for 32-bit integer
private writers = 0
private readers = 0
private writerQueue = 0
private readerQueue = 0
public async lockRead() {
Atomics.add(this.lock, 0, 1)
if (this.writers > 0 || this.writerQueue > 0) {
this.readerQueue++
while (this.writers > 0 || this.writerQueue > 0) {
await new Promise((resolve) => setImmediate(resolve))
}
this.readerQueue--
}
this.readers++
Atomics.sub(this.lock, 0, 1)
}
public unlockRead() {
Atomics.add(this.lock, 0, 1)
this.readers--
Atomics.sub(this.lock, 0, 1)
}
public async lockWrite() {
Atomics.add(this.lock, 0, 1)
if (this.readers > 0 || this.writers > 0) {
this.writerQueue++
while (this.readers > 0 || this.writers > 0) {
await new Promise((resolve) => setImmediate(resolve))
}
this.writerQueue--
}
this.writers++
Atomics.sub(this.lock, 0, 1)
}
public unlockWrite() {
Atomics.add(this.lock, 0, 1)
this.writers--
Atomics.sub(this.lock, 0, 1)
}
}
import { describe, it } from "node:test"
describe("ReadWriteLock", () => {
it("should lock and unlock", async () => {
const lock = new ReadWriteLock()
await lock.lockRead()
lock.unlockRead()
await lock.lockWrite()
lock.unlockWrite()
})
it("should lock and unlock multiple times in parallel", async () => {
const lock = new ReadWriteLock()
await Promise.all([
(async () => {
await lock.lockRead()
lock.unlockRead()
console.log("Finished 1")
})(),
(async () => {
await lock.lockWrite()
lock.unlockWrite()
console.log("Finished 2")
})(),
])
})
})
describe("AsyncReadWriteLock", () => {
it("should lock and unlock", async () => {
const lock = new AsyncReadWriteLock()
await lock.lockRead()
lock.unlockRead()
await lock.lockWrite()
lock.unlockWrite()
})
it("should lock and unlock multiple times in parallel", async () => {
const lock = new AsyncReadWriteLock()
await Promise.all([
(async () => {
await lock.lockRead()
lock.unlockRead()
console.log("Finished 1")
})(),
(async () => {
await lock.lockWrite()
lock.unlockWrite()
console.log("Finished 2")
})(),
])
})
it("should fail when unlocking read lock while not holding it", async () => {
const lock = new AsyncReadWriteLock()
try {
lock.unlockRead()
} catch (e) {
console.log("Failed as expected")
return
}
})
it("should fail when unlocking write lock while not holding it", async () => {
const lock = new AsyncReadWriteLock()
try {
lock.unlockWrite()
} catch (e) {
console.log("Failed as expected")
return
}
})
it("should fail when parallel locks have invalid order", async () => {
const lock = new AsyncReadWriteLock()
Promise.all([
(async () => {
await lock.lockWrite()
})(),
(async () => {
await lock.unlockRead()
})(),
]).catch((e) => {
console.log("Failed as expected")
})
})
})
@quanta-kt
Copy link

This is beautiful, thanks.

@GavinRay97
Copy link
Author

No problem =)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment