Skip to content

Instantly share code, notes, and snippets.

@crabmusket

crabmusket/readme.md

Last active Aug 12, 2019
Embed
What would you like to do?
Async semaphore utility class for Deno

semaphore.ts

An asynchronous semaphore utility class meant to work with Deno. You can use it to implement rate-limiting and other concurrency patterns.

Usage example:

import { Semaphore } from "https://gist.githubusercontent.com/crabmusket/681b4f81ed05716f8dd10ac88d1d960b/raw/5e7c18013e896425d1048324f8668932f23437df/semaphore.ts";

const semaphore = new Semaphore(5);
semaphore.signal(3);
await semaphore.wait(1);
console.log('used 1, yay!');
if (semaphore.tryWait(3)) {
  // this isn't reached, because there were only 2 left
}

If you don't want to limit the maximum capacity of your semaphore, just use new Semaphore(Infinity).

Note that the following operations constitute programmer error and will therefore throw exceptions:

  • Attempting to signal or wait for 0, a negative number, NaN or Infinity
  • Attempting to signal or wait an amount greater than the Semaphore's capacity as passed in its constructor

To run tests:

deno https://gist.githubusercontent.com/crabmusket/681b4f81ed05716f8dd10ac88d1d960b/raw/5e7c18013e896425d1048324f8668932f23437df/semaphore_test.ts

Other projects

https://github.com/notenoughneon/await-semaphore

  • Doesn't allow acquiring more than 1 count of resource at a time
  • Can only release resource from the closure that acquired it (therefore I would argue it's not really a semaphore)

https://github.com/abrkn/semaphore.js

  • Doesn't allow acquiring more than 1 count of resource at a time
  • Not Deno-compatible
/**
* A simple asynchronous (but not parallel) semaphore. For information about
* semaphores, see https://en.wikipedia.org/wiki/Semaphore_(programming)
*
* To create a "binary semaphore", use `new Semaphore(1)`. Note that a "mutex"
* is not the same as a binary semaphore.
*/
export class Semaphore {
_capacity: number;
_available: number;
_queued: [number, () => void][];
/**
* Create a Semaphore with a limited quantity of resources. If you actually want
* an unlimited quantity, use `new Semaphore(Infinity)`.
*/
constructor(capacity: number) {
if (capacity <= 0) {
throw new Error(`cannot construct Semaphore with capacity <= 0`);
}
if (isNaN(capacity)) {
throw new Error(`cannot construct Semaphore with capacity of NaN`);
}
this._capacity = capacity;
this._available = 0;
this._queued = [];
}
/** Get the total capacity of resources this semaphore is limited to. */
capacity() {
return this._capacity;
}
/** Get the current available quantity of resources. */
available() {
return this._available;
}
/** Wait until a particular quantity of resources becomes available. */
async wait(count: number): Promise<void> {
if (count <= 0) {
throw new Error(`cannot consume <= 0 resources`);
}
if (isNaN(count) || !isFinite(count)) {
throw new Error(`cannot consume NaN or Infinity resources`);
}
if (count > this._capacity) {
throw new Error(
`attempted to consume ${count}, capacity is only ${this._capacity}`
);
}
if (count <= this._available) {
this._available -= count;
return Promise.resolve();
}
return new Promise((resolve, reject) => {
this._queued.push([count, resolve]);
});
}
/**
* Check if a particular quantity of resources is available, and if so, consume
* them and return true.
*/
tryWait(count: number): boolean {
if (count <= 0) {
throw new Error(`cannot consume <= 0 resources`);
}
if (isNaN(count) || !isFinite(count)) {
throw new Error(`cannot consume NaN or Infinity resources`);
}
if (count > this._capacity) {
throw new Error(
`attempted to consume ${count}, capacity is only ${this._capacity}`
);
}
if (count <= this._available) {
this._available -= count;
return true;
}
return false;
}
/**
* Increase the quantity of resources available. Cannot increase above this
* Semaphore's capacity provided in the constructor.
*/
signal(count: number) {
if (count <= 0) {
throw new Error(`cannot provide <= 0 resources`);
}
if (isNaN(count) || !isFinite(count)) {
throw new Error(`cannot provide NaN or Infinity resources`);
}
this._available += count;
if (this._available > this._capacity) {
this._available = this._capacity;
}
while (this._queued.length && this._available > 0) {
let [taskCount, resolve] = this._queued[0];
if (taskCount <= this._available) {
this._available -= taskCount;
resolve();
this._queued.shift();
}
}
}
}
import { test, runTests } from "https://deno.land/std@v0.11.0/testing/mod.ts";
import {
assert,
assertEquals,
assertThrows,
assertThrowsAsync
} from "https://deno.land/std@v0.11.0/testing/asserts.ts";
import { delay } from "https://deno.land/std@826deb1/util/async.ts"; // not published yet
import { Semaphore } from "./semaphore.ts";
test(function testConstructor() {
new Semaphore(1);
new Semaphore(5);
new Semaphore(Infinity);
assertThrows(() => new Semaphore(-1));
assertThrows(() => new Semaphore(0));
assertThrows(() => new Semaphore(-Infinity));
assertThrows(() => new Semaphore(NaN));
});
test(async function testWaiting() {
const s = new Semaphore(1);
s.signal(1);
await s.wait(1);
let hasConsumed = false;
s.wait(1).then(() => {
hasConsumed = true;
});
await delay(100);
assertEquals(false, hasConsumed);
s.signal(1);
await delay(1);
assertEquals(true, hasConsumed);
});
test(async function testBlocking() {
const s = new Semaphore(1);
const before = Date.now();
setTimeout(() => s.signal(1), 150);
await s.wait(1);
const after = Date.now();
assert(after - before >= 150);
});
test(async function testQueueing() {
let counter = 0;
function increment() {
counter += 1;
}
const s = new Semaphore(10);
for (let i = 0; i < 20; i += 1) {
s.wait(1).then(increment);
}
assertEquals(0, counter);
s.signal(1);
await delay(1);
assertEquals(1, counter);
s.signal(1000);
await delay(1);
assertEquals(11, counter);
s.signal(1);
await delay(1);
assertEquals(12, counter);
s.signal(10);
await delay(1);
assertEquals(20, counter);
});
test(async function testInvalidConsumptionOrProduction() {
const s = new Semaphore(5);
s.signal(10);
assertThrows(() => s.tryWait(Infinity));
assertThrows(() => s.tryWait(NaN));
assertThrows(() => s.tryWait(-Infinity));
assertThrows(() => s.tryWait(-2));
assertThrows(() => s.tryWait(0));
assertThrowsAsync(async () => await s.wait(Infinity));
assertThrowsAsync(async () => await s.wait(NaN));
assertThrowsAsync(async () => await s.wait(-Infinity));
assertThrowsAsync(async () => await s.wait(-2));
assertThrowsAsync(async () => await s.wait(0));
assertThrows(() => s.signal(Infinity));
assertThrows(() => s.signal(NaN));
assertThrows(() => s.signal(-1));
assertThrows(() => s.signal(-Infinity));
assertThrows(() => s.signal(0));
});
test(async function testConsumeTooMuch() {
const s = new Semaphore(5);
s.signal(10);
assertThrows(() => s.tryWait(10));
assertThrowsAsync(async () => await s.wait(10));
});
if (import.meta.main) {
runTests();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.