Skip to content

Instantly share code, notes, and snippets.

@cahilfoley
Created February 25, 2020 03:10
Show Gist options
  • Save cahilfoley/4b1b2f3fa9e2f9652ee1d8501443b5ca to your computer and use it in GitHub Desktop.
Save cahilfoley/4b1b2f3fa9e2f9652ee1d8501443b5ca to your computer and use it in GitHub Desktop.
Simple TypeScript implementation of a semaphore
import { cpus } from 'os'
/**
* A lock that is granted when calling [[Semaphore.acquire]].
*/
type Lock = {
release: () => void
}
/**
* A task that has been scheduled with a [[Semaphore]] but not yet started.
*/
type WaitingPromise = {
resolve: (lock: Lock) => void
reject: (err?: Error) => void
}
/**
* A [[Semaphore]] is a tool that is used to control concurrent access to a common resource. This implementation
* is used to apply a max-parallelism threshold.
*/
export class Semaphore {
private logger: Logger
private running = 0
private waiting: WaitingPromise[] = []
private debugLogging = true
constructor(private label: string, public max: number = cpus().length) {
if (max < 1) {
throw new Error(
`The ${label} semaphore was created with a max value of ${max} but the max value cannot be less than 1`,
)
}
}
/**
* Allows the next task to start, if there are any waiting.
*/
private take = () => {
if (this.waiting.length > 0 && this.running < this.max) {
this.running++
// Get the next task from the queue
const task = this.waiting.shift()
// Resolve the promise to allow it to start, provide a release function
task.resolve({ release: this.release })
}
}
/**
* Acquire a lock on the target resource.
*
* ! Returns a function to release the lock, it is critical that this function is called when the task is finished with the resource.
*/
acquire = (): Promise<Lock> => {
if (this.debugLogging) {
console.log(
`Lock requested for the ${this.label} resource - ${this.running} active, ${this.waiting.length} waiting`,
)
}
if (this.running < this.max) {
this.running++
return Promise.resolve({ release: this.release })
}
if (this.debugLogging) {
console.log(
`Max active locks hit for the ${this.label} resource - there are ${this.running} tasks running and ${this.waiting.length} waiting.`,
)
}
return new Promise<Lock>((resolve, reject) => {
this.waiting.push({ resolve, reject })
})
}
/**
* Releases a lock held by a task. This function is returned from the acquire function.
*/
private release = () => {
this.running--
this.take()
}
/**
* Purge all waiting tasks from the [[Semaphore]]
*/
purge = () => {
if (this.debugLogging) {
console.info(
`Purge requested on the ${this.label} semaphore, ${this.waiting.length} pending tasks will be cancelled.`,
)
}
this.waiting.forEach(task => {
task.reject(
new Error('The semaphore was purged and as a result this task has been cancelled'),
)
})
this.running = 0
this.waiting = []
}
}
/***********************
* Example usage below *
***********************/
// Just a promise that resolves in `ms` milliseconds
const pause = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))
// Allows up to 6 parallel operations to run
const exampleResource = new Semaphore('example', 6)
async function runOperation() {
// Wait until lock is acquired to do anything
const lock = await exampleResource.acquire()
// Simulated operation that takes 1 second to finish
await pause(1000)
// Done with the resource now, release the lock to let others use it
lock.release()
}
const start = Date.now()
for (let i = 0; i < 50; i++) {
runOperation().then(() => {
console.log(`Operation ${i} finished in ${Math.floor((Date.now() - start) / 1000)} secs`)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment